Compare commits

...

7 Commits

  1. 1
      .gitignore
  2. 10
      pom.xml
  3. 14
      src/log4j2.xml
  4. 144
      src/org/leolo/rail/nrd/AssoicationHandler.java
  5. 86
      src/org/leolo/rail/nrd/AssoicationProcessor.java
  6. 63
      src/org/leolo/rail/nrd/ConfigurationManager.java
  7. 8
      src/org/leolo/rail/nrd/Constants.java
  8. 102
      src/org/leolo/rail/nrd/DatabaseManager.java
  9. 265
      src/org/leolo/rail/nrd/FileLoader.java
  10. 275
      src/org/leolo/rail/nrd/ScheduleHandler.java
  11. 244
      src/org/leolo/rail/nrd/ScheduleProcessor.java
  12. 77
      src/org/leolo/rail/nrd/TimetableHandler.java
  13. 108
      src/org/leolo/rail/nrd/TimetableHandler2.java
  14. 153
      src/org/leolo/rail/nrd/TiplocHandler.java
  15. 85
      src/org/leolo/rail/nrd/TiplocProcessor.java
  16. 88
      src/org/leolo/rail/nrd/db.sql

1
.gitignore vendored

@ -1,3 +1,4 @@
/bin/
/target/
/configuration.properties
output.file

10
pom.xml

@ -28,6 +28,16 @@
<artifactId>json</artifactId>
<version>20210307</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.13.3</version>
</dependency>
</dependencies>
</project>

14
src/log4j2.xml

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout
pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>

144
src/org/leolo/rail/nrd/AssoicationHandler.java

@ -0,0 +1,144 @@
package org.leolo.rail.nrd;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
public class AssoicationHandler implements Runnable{
private static Logger log = LogManager.getLogger(AssoicationHandler.class);
private final Object SYNC_TOKEN = new Object();
private boolean shutdown = false;
private boolean terminated = false;
private Queue<JSONObject> queue = new LinkedList<>();
public AssoicationHandler() {
new Thread(this).start();
}
public void add(JSONObject obj) {
if(obj!=null) {
queue.add(obj);
}else {
log.warn("Trying to add a null object!");
}
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdown() {
shutdown = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdownAndWait() {
shutdown();
while(true) {
log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null);
if(terminated) {
break;
}
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.notifyAll();
SYNC_TOKEN.wait(1000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
private int currentSize = 0;
private int commitCount = 0;
@Override
public void run() {
try(
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)")
){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
while(true) {
JSONObject obj = queue.poll();
if(shutdown && (queue.isEmpty()||obj==null)) {
pstmt.executeBatch();
conn.commit();
log.info("Train assoc committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount);
break;
}else if(obj==null) {
continue;
}
synchronized(FileLoader.SYNC_TOKEN) {
FileLoader.SYNC_TOKEN.notifyAll();
}
// log.info("Assoc {}-{}@{}",obj.optString("main_train_uid"),obj.optString("assoc_train_uid"), obj.optString("location"));
pstmt.setString(1, obj.optString("main_train_uid"));
pstmt.setString(2, obj.optString("assoc_train_uid"));
try {
pstmt.setDate(3, new java.sql.Date(sdf.parse(obj.optString("assoc_start_date")).getTime()));
pstmt.setDate(4, new java.sql.Date(sdf.parse(obj.optString("assoc_end_date")).getTime()));
} catch (ParseException e1) {
log.warn("Unable to parse date! {}", e1.getMessage(), e1);
continue;
}
pstmt.setString(5, obj.optString("assoc_days"));
pstmt.setString(6, obj.optString("category"));
pstmt.setString(7, obj.optString("date_indicator"));
pstmt.setString(8, obj.optString("location"));
if(obj.optString("base_location_suffix")!=null)
pstmt.setString(9, obj.optString("base_location_suffix"));
else
pstmt.setNull(9, Types.VARCHAR);
if(obj.optString("assoc_location_suffix")!=null)
pstmt.setString(10, obj.optString("assoc_location_suffix"));
else
pstmt.setNull(10, Types.VARCHAR);
pstmt.setString(11, obj.optString("diagram_type"));
pstmt.setString(12, obj.optString("CIF_stp_indicator"));
pstmt.addBatch();
if(++currentSize>=Constants.BATCH_SIZE) {
pstmt.executeBatch();
conn.commit();
log.info("Train assoc committed {} entry. Commit #{}", currentSize, ++commitCount);
currentSize = 0;
}
if(queue.isEmpty()) {
log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
}catch(SQLException e) {
log.error(e.getMessage(),e);
}
terminated = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public int getQueueSize() {
return queue.size();
}
}

86
src/org/leolo/rail/nrd/AssoicationProcessor.java

@ -0,0 +1,86 @@
package org.leolo.rail.nrd;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.zip.GZIPInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
public class AssoicationProcessor implements Runnable {
private File fileName;
private Logger log = LogManager.getLogger(getClass());
AssoicationProcessor(File fileName){
this.fileName = fileName;
}
@Override
public void run() {
log.info("Processing {}", fileName.getName());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
try(
BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(fileName))));
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO n_train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)")
){
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
JSONObject obj = new JSONObject(line);
pstmt.setString(1, obj.optString("main_train_uid"));
pstmt.setString(2, obj.optString("assoc_train_uid"));
try {
pstmt.setDate(3, new java.sql.Date(sdf.parse(obj.optString("assoc_start_date")).getTime()));
pstmt.setDate(4, new java.sql.Date(sdf.parse(obj.optString("assoc_end_date")).getTime()));
} catch (ParseException e1) {
log.warn("Unable to parse date! {}", e1.getMessage(), e1);
continue;
}
pstmt.setString(5, obj.optString("assoc_days"));
pstmt.setString(6, obj.optString("category"));
pstmt.setString(7, obj.optString("date_indicator"));
pstmt.setString(8, obj.optString("location"));
if(obj.optString("base_location_suffix")!=null)
pstmt.setString(9, obj.optString("base_location_suffix"));
else
pstmt.setNull(9, Types.VARCHAR);
if(obj.optString("assoc_location_suffix")!=null)
pstmt.setString(10, obj.optString("assoc_location_suffix"));
else
pstmt.setNull(10, Types.VARCHAR);
pstmt.setString(11, obj.optString("diagram_type"));
pstmt.setString(12, obj.optString("CIF_stp_indicator"));
pstmt.addBatch();
}
pstmt.executeBatch();
conn.commit();
log.info("Batch {} committed.", fileName.getName());
} catch (FileNotFoundException e) {
log.error(e.getMessage(), e);
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
if(!fileName.delete()) {
log.warn("Unable to delete {}", fileName.getName());
}
}
}

63
src/org/leolo/rail/nrd/ConfigurationManager.java

@ -0,0 +1,63 @@
package org.leolo.rail.nrd;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import java.util.function.BiConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ConfigurationManager {
private static Logger log = LogManager.getLogger(ConfigurationManager.class);
private static ConfigurationManager instance;
private Properties prop = new Properties();
public synchronized static ConfigurationManager getInstance() {
if(instance==null) {
instance = new ConfigurationManager();
}
return instance;
}
private ConfigurationManager() {
try(FileReader fr = new FileReader("configuration.properties")){
log.debug("Loading properties file");
prop.load(fr);
log.info("{} entries loaded", prop.size());
}catch(IOException e) {
log.fatal(e.getMessage(), e);
System.exit(1);
}
}
public void forEach(BiConsumer<? super Object, ? super Object> action) {
prop.forEach(action);
}
public Object get(Object key) {
return prop.get(key);
}
public Object getOrDefault(Object key, Object defaultValue) {
return prop.getOrDefault(key, defaultValue);
}
public String getProperty(String key, String defaultValue) {
return prop.getProperty(key, defaultValue);
}
public String getProperty(String key) {
return prop.getProperty(key);
}
public int size() {
return prop.size();
}
public boolean containsKey(Object key) {
return prop.containsKey(key);
}
}

8
src/org/leolo/rail/nrd/Constants.java

@ -0,0 +1,8 @@
package org.leolo.rail.nrd;
public class Constants {
public static final int BATCH_SIZE = 1000;
@Deprecated
public static final int MAX_QUEUE_SIZE = 40000;
}

102
src/org/leolo/rail/nrd/DatabaseManager.java

@ -0,0 +1,102 @@
package org.leolo.rail.nrd;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import javax.sql.XAConnection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mariadb.jdbc.MariaDbPoolDataSource;
public class DatabaseManager {
private static Logger log = LogManager.getLogger(DatabaseManager.class);
private static DatabaseManager instance;
private MariaDbPoolDataSource ds;
public synchronized static DatabaseManager getInstance() {
if(instance==null) {
instance = new DatabaseManager();
}
return instance;
}
private DatabaseManager() {
ConfigurationManager cm = ConfigurationManager.getInstance();
if(
!cm.containsKey("db.host")||
!cm.containsKey("db.user")||
!cm.containsKey("db.pwd")||
!cm.containsKey("db.name")
) {
log.fatal("Missing required property");
System.exit(1);
}
String url = "jdbc:mariadb://"+cm.getProperty("db.host")+
":"+cm.getProperty("db.port", "3306")+
"/"+cm.getProperty("db.name");
log.info("Connecting to DB {} as {}", url, cm.get("db.user"));
try {
ds = new MariaDbPoolDataSource(url);
ds.setMaxPoolSize(Integer.parseInt(cm.getOrDefault("db.poolsize", "20").toString()));
ds.setUser(cm.getProperty("db.user").toString());
ds.setPassword(cm.getProperty("db.pwd").toString());
} catch (SQLException e) {
log.fatal("Cannot connect to DB",e);
System.exit(-2);
}
}
public boolean testPool() {
try(Connection conn = ds.getConnection()){
try(Statement stmt = conn.createStatement()){
try (ResultSet rs = stmt.executeQuery("SELECT CONNECTION_ID()")){
if(rs.next()) {
log.debug("Connection ID: {}", rs.getString(1));
}
}
}
} catch (SQLException e) {
log.warn("Exception when testing the connection., e");
return false;
}
return true;
}
public Connection getConnection() throws SQLException {
Connection conn = ds.getConnection();
conn.setAutoCommit(false);
return conn;
}
public XAConnection getXAConnection() throws SQLException {
XAConnection conn = ds.getXAConnection();
return conn;
}
@Deprecated
public void clear() {
try(
Connection conn = getConnection();
Statement stmt = conn.createStatement();
){
stmt.execute("TRUNCATE TABLE train_assoc");
stmt.execute("TRUNCATE TABLE tiploc");
stmt.execute("TRUNCATE TABLE train_schedule");
stmt.execute("TRUNCATE TABLE train_schedule_detail");
stmt.execute("TRUNCATE TABLE train_schedule_location");
stmt.execute("TRUNCATE TABLE train_error");
}catch(SQLException e) {
log.error(e.getMessage(), e);
}
}
public void shutdown() {
ds.close();
}
}

265
src/org/leolo/rail/nrd/FileLoader.java

@ -0,0 +1,265 @@
package org.leolo.rail.nrd;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLConnection;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Base64;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
public class FileLoader {
private static Logger log = LogManager.getLogger(FileLoader.class);
public static final Object SYNC_TOKEN = new Object();
public static void main(String [] args) {
new FileLoader().run();
}
public void run() {
log.info("Process started");
ConfigurationManager.getInstance().forEach((k,v)->{
if(!((String)k).endsWith(".pwd")) {
log.debug("{} -> {}", k, v);
}else {
log.debug("{} -> ****", k);
}
});
if(DatabaseManager.getInstance().testPool()) {
log.info("Successfully connected to the database");
}
if(!ConfigurationManager.getInstance().containsKey("file.path")) {
log.fatal("Cannot find file path");
System.exit(1);
}
InputStream fis = null;
//TODO: get the file from Network Rail
try {
URL url = new URL(ConfigurationManager.getInstance().getProperty("remote.path"));
URLConnection conn = url.openConnection();
String userpwd = ConfigurationManager.getInstance().getProperty("remote.user")+":"+ConfigurationManager.getInstance().getProperty("remote.pwd");
conn.addRequestProperty("Authorization", "Basic "+Base64.getEncoder().encodeToString(userpwd.getBytes()));
conn.connect();
fis = conn.getInputStream();
}catch(IOException e) {
log.error(e.getMessage(), e);
System.exit(4);
}
log.info("Connected!");
File tempDir = new File(ConfigurationManager.getInstance().getProperty("file.temp_dir", ".")+"/nrdg");
if(!tempDir.exists()) {
tempDir.mkdirs();
}
// DatabaseManager.getInstance().clear();
prepDB();
int countA=0, countT=0, countS=0;
int batchA=0, batchT=0, batchS=0;
try(
GZIPInputStream gis = new GZIPInputStream(fis);
BufferedReader br = new BufferedReader(new InputStreamReader(gis));
){
PrintWriter ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_0"))));
PrintWriter tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_0"))));
PrintWriter sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_0"))));
int count = 0;
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
count++;
JSONObject obj = new JSONObject(line);
String objectType = obj.keys().next();
if("JsonTimetableV1".equals(objectType)) {
}else if("JsonAssociationV1".equals(objectType)){
ass.println(obj.getJSONObject("JsonAssociationV1"));
countA++;
if(countA%Constants.BATCH_SIZE==0) {
ass.close();
if(batchA%50==0)
log.info("Created batch for assoication");
ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_"+(++batchA)))));
}
}else if("TiplocV1".equals(objectType)){
tip.println(obj.getJSONObject("TiplocV1"));
countT++;
if(countT%Constants.BATCH_SIZE==0) {
tip.close();
if(batchT%50==0)
log.info("Created batch for TIPLOC");
tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_"+(++batchT)))));
}
}else if("JsonScheduleV1".equals(objectType)){
sch.println(obj.getJSONObject("JsonScheduleV1"));
countS++;
if(countS%Constants.BATCH_SIZE==0) {
sch.close();
if(batchS%50==0)
log.info("Created batch for schedule");
sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_"+(++batchS)))));
}
}else if("EOF".equals(objectType)){
//Nothing to do
}else {
log.fatal("Unhandled type {}", objectType);
System.exit(2);
}
}
ass.close();
tip.close();
sch.close();
log.info("Created batch for assoication");
log.info("Created batch for TIPLOC");
log.info("Created batch for schedule");
log.info("Total count : {}", count);
}catch(ZipException e) {
log.error(e.getMessage(), e);
captureFile();
}catch(IOException e) {
log.error(e.getMessage(), e);
System.exit(1);
}
try {
fis.close();
}catch(IOException e) {
log.fatal(e.getMessage(), e);
System.exit(1);
}
log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS);
ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(Integer.parseInt(ConfigurationManager.getInstance().getProperty("thread", "20")));
for(int i=0;i<=batchA;i++) {
if(i%50==0)
log.info("Queued Assoication - {}", i);
threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i)));
}
for(int i=0;i<=batchT;i++) {
if(i%50==0)
log.info("Queued Tiploc - {}", i);
threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i)));
}
for(int i=0;i<=batchS;i++) {
if(i%50==0)
log.info("Queued Schedule - {}", i);
threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i)));
}
log.info("All entry queued.");
threadPool.shutdown();
try {
threadPool.awaitTermination(3, TimeUnit.HOURS);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
if(!threadPool.isTerminated()) {
log.error("Some job cannot be finished!");
System.exit(50);
}
finalizeDB();
DatabaseManager.getInstance().shutdown();
if(!tempDir.delete()) {
log.warn("Unable to remove temp dir!");
}
log.info("Job finished!");
}
private static final String [] TABLES = {
"tiploc","train_assoc","train_schedule",
"train_schedule_detail","train_schedule_location","train_error",
};
private void finalizeDB() {
try(
Connection conn = DatabaseManager.getInstance().getConnection();
Statement stmt = conn.createStatement();
){
for(String table:TABLES) {
log.info("Handling {}", table);
stmt.execute("DROP TABLE IF EXISTS o_"+table);
stmt.execute("RENAME TABLE "+table+" TO o_"+table+",n_"+table+" TO "+table);
}
log.info("Creating extra index. This may take a while");
stmt.execute("ALTER TABLE train_schedule_location ADD INDEX (tiploc) USING BTREE");
log.info("Removing backup tables");
for(String table:TABLES) {
stmt.execute("DROP TABLE IF EXISTS o_"+table);
}
} catch (SQLException e) {
log.fatal(e.getMessage(), e);
System.exit(1);
}
}
private void prepDB() {
log.info("Preparing the database.");
StringBuilder sqls = new StringBuilder();
try(BufferedReader br = new BufferedReader(new InputStreamReader(Thread.currentThread().getContextClassLoader().getResourceAsStream("org/leolo/rail/nrd/db.sql")))){
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
sqls.append(line);
}
} catch (IOException e) {
log.fatal(e.getMessage(), e);
System.exit(1);
}
log.info("Done loading SQLs from resource");
try(
Connection conn = DatabaseManager.getInstance().getConnection();
Statement stmt = conn.createStatement();
){
for(String sql:sqls.toString().split(";")) {
log.debug("SQL: {}", sql);
stmt.execute(sql);
}
} catch (SQLException e) {
log.fatal(e.getMessage(), e);
System.exit(1);
}
log.info("Done prepare the DB");
}
private void captureFile() {
try {
URL url = new URL(ConfigurationManager.getInstance().get("remote.path").toString());
BufferedReader br = new BufferedReader(new InputStreamReader(url.openStream()));
PrintWriter out = new PrintWriter("output.file");
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
out.println(line);
}
out.close();
br.close();
}catch(IOException e) {
log.error(e.getMessage(), e);
}
}
}

275
src/org/leolo/rail/nrd/ScheduleHandler.java

@ -0,0 +1,275 @@
package org.leolo.rail.nrd;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
public class ScheduleHandler implements Runnable{
private static Logger log = LogManager.getLogger(ScheduleHandler.class);
private final Object SYNC_TOKEN = new Object();
private boolean shutdown = false;
private boolean terminated = false;
private Queue<JSONObject> queue = new LinkedList<>();
public ScheduleHandler() {
new Thread(this).start();
}
public void add(JSONObject obj) {
if(obj!=null) {
queue.add(obj);
}else {
log.warn("Trying to add a null object!");
}
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdown() {
shutdown = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdownAndWait() {
shutdown();
while(true) {
if(terminated) {
break;
}
synchronized(SYNC_TOKEN) {
log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null);
try {
SYNC_TOKEN.notifyAll();
SYNC_TOKEN.wait(1000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
private int currentSize = 0;
private int commitCount = 0;
private int trainCount = 0;
private synchronized String getTUID() {
String id = Integer.toString(++trainCount, 36);
StringBuffer sb = new StringBuffer();
for(int i = id.length();i<8;i++) {
sb.append("0");
}
sb.append(id);
return sb.toString();
}
private Time parseTime(String time) {
try {
int hour = Integer.parseInt(time.substring(0, 2));
int min = Integer.parseInt(time.substring(2, 4));
boolean halfMin = time.length()>4 && 'H' == time.charAt(4);
return new Time(hour*3_600_000+min*60_000+(halfMin?30_000:0));
}catch(RuntimeException e) {
log.error("For time \"{}\":{}", time, e.getMessage(), e);
}
return null;
}
private String parseSTime(String time) {
if("H".equals(time))
return "00:00:30";
int min = Integer.parseInt(time.substring(0,1));
boolean half = time.length()>1 && 'H' == time.charAt(1);
return "00:0"+min+(half?":30":":00");
}
private void setTime(PreparedStatement stmt, int pos, String time) throws SQLException{
if(time==null||"".equals(time)) {
stmt.setNull(pos, Types.TIME);
}else {
stmt.setTime(pos, parseTime(time));
}
}
private void setSTime(PreparedStatement stmt, int pos, String time) throws SQLException{
if(time==null||"".equals(time)) {
stmt.setNull(pos, Types.TIME);
}else {
stmt.setString(pos, parseSTime(time));
}
}
@Override
public void run() {
try(
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement sMain = conn.prepareStatement("INSERT INTO train_schedule VALUES (?,?,?,?,?,?,?)");
PreparedStatement sDetail = conn.prepareStatement("INSERT INTO train_schedule_detail VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
PreparedStatement sError = conn.prepareStatement("INSERT INTO train_error VALUES (?,?)");
PreparedStatement sLoca = conn.prepareStatement("INSERT INTO train_schedule_location VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
while(true) {
JSONObject obj = queue.poll();
if(shutdown && (queue.isEmpty()||obj==null)) {
sMain.executeBatch();
sDetail.executeBatch();
sError.executeBatch();
sLoca.executeBatch();
conn.commit();
log.info("Train schedule committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount);
break;
}else if(obj==null) {
log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
log.debug("Woke up.");
continue;
}
synchronized(FileLoader.SYNC_TOKEN) {
FileLoader.SYNC_TOKEN.notifyAll();
}
//Parse data
String trainUID = getTUID();
sMain.setString(1, trainUID);
sMain.setString(2, obj.optString("CIF_train_uid"));
try {
sMain.setDate(3, new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime()));
sMain.setDate(4, new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime()));
} catch (ParseException e1) {
log.error(e1.getMessage(), e1);
continue;
}
sMain.setString(5, obj.optString("schedule_days_runs"));
sMain.setString(6, obj.optString("train_status"));
sMain.setString(7, obj.optString("atoc_code"));
sMain.addBatch();
currentSize++;
JSONObject detail = obj.optJSONObject("schedule_segment");
setString(sDetail, 1, trainUID);
setString(sDetail, 2, detail.optString("CIF_train_category"));
setString(sDetail, 3, detail.optString("signalling_id"));
setString(sDetail, 4, detail.optString("CIF_headcode"));
setString(sDetail, 5, detail.optString("CIF_course_indicator"));
setString(sDetail, 6, detail.optString("CIF_train_service_code"));
setString(sDetail, 7, detail.optString("CIF_business_sector"));
setString(sDetail, 8, detail.optString("CIF_power_type"));
setString(sDetail, 9, detail.optString("CIF_timing_load"));
setInt (sDetail,10, detail.optString("CIF_speed"));
setString(sDetail,11, detail.optString("CIF_operating_characteristics"));
setString(sDetail,12, detail.optString("CIF_sleepers"));
setString(sDetail,13, detail.optString("CIF_reservations"));
setString(sDetail,14, detail.optString("CIF_connection_indicator"));
setString(sDetail,15, detail.optString("CIF_catering_code"));
setString(sDetail,16, detail.optString("CIF_service_branding"));
sDetail.addBatch();
currentSize++;
JSONArray locations = detail.optJSONArray("schedule_location");
if(locations!=null) {
for(int i=0;i<locations.length();i++) {
JSONObject location = locations.getJSONObject(i);
String type = location.getString("location_type");
if("LO".equals(type) || "LI".equals(type) || "LT".equals(type)) {
setString(sLoca, 1, trainUID);
sLoca.setInt(2, i);
setString(sLoca, 3, location.optString("tiploc_code"));
setTime (sLoca, 4, location.optString("arrival"));
setTime (sLoca, 5, location.optString("public_arrival"));
setTime (sLoca, 6, location.optString("departure"));
setTime (sLoca, 7, location.optString("public_departure"));
setTime (sLoca, 8, location.optString("pass"));
setString(sLoca, 9, location.optString("platform"));
setString(sLoca,10, location.optString("line"));
setString(sLoca,11, location.optString("path"));
setSTime (sLoca,12, location.optString("engineering_allowance"));
setSTime (sLoca,13, location.optString("pathing_allowance"));
setSTime (sLoca,14, location.optString("performance_allowance"));
sLoca.addBatch();
currentSize++;
}else {
log.fatal("Unknown entry type {}", type);
System.exit(3);
}
}
}else {
sError.setString(1, trainUID);
sError.setString(2, obj.toString());
// sError.addBatch();
// currentSize++;
}
if(currentSize>=Constants.BATCH_SIZE) {
sMain.executeBatch();
sDetail.executeBatch();
sError.executeBatch();
sLoca.executeBatch();
conn.commit();
log.info("Train schedule committed {} entry. Commit #{}", currentSize, ++commitCount);
currentSize = 0;
}
if(queue.isEmpty()) {
// log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
// log.debug("Woke up.");
}
}
}catch(SQLException e) {
log.error(e.getMessage(),e);
}
terminated = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
private void setInt(PreparedStatement stmt, int i, String string) throws SQLException{
if(string==null || "".equals(string))
stmt.setNull(i, Types.INTEGER);
else
stmt.setInt(i, Integer.parseInt(string));
}
private void setString(PreparedStatement stmt, int i, String string) throws SQLException{
if(string==null)
stmt.setNull(i, Types.VARCHAR);
else
stmt.setString(i, string);
}
public int getQueueSize() {
return queue.size();
}
}

244
src/org/leolo/rail/nrd/ScheduleProcessor.java

@ -0,0 +1,244 @@
package org.leolo.rail.nrd;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Types;
import java.text.DateFormat;
import java.text.FieldPosition;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import java.util.Random;
import java.util.zip.GZIPInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
public class ScheduleProcessor implements Runnable {
private File fileName;
private Logger log = LogManager.getLogger(getClass());
private static TUIDDateFormat tdf = new TUIDDateFormat();
public static final String SEQ_ID = "0123456789";
private static Random r = new Random();
private static class TUIDDateFormat extends DateFormat{
public static final String MONTH_ID = "MBTQPHSONDUE";
public static final String DAY_ID = "0123456789ABCDEFGHJKLMNPRSTUVWX";
@Override
public StringBuffer format(Date arg0, StringBuffer arg1, FieldPosition arg2) {
// TODO Auto-generated method stub
arg1.append(MONTH_ID.charAt(arg0.getMonth()));
arg1.append(DAY_ID.charAt(arg0.getDate()-1));
return arg1;
}
@Override
public Date parse(String arg0, ParsePosition arg1) {
throw new UnsupportedOperationException();
}
}
ScheduleProcessor(File fileName){
this.fileName = fileName;
}
private static int trainCount = 0;
private static synchronized String getTUID() {
String id = Integer.toString(++trainCount, 36);
StringBuffer sb = new StringBuffer();
for(int i = id.length();i<8;i++) {
sb.append("0");
}
sb.append(id);
return sb.toString();
}
private static Hashtable<String, Integer> uidMap = new Hashtable<>();
private static synchronized String getTUID(String uid, Date startDate, Date endDate) {
String buid = uid+tdf.format(startDate)+tdf.format(endDate);
int count = 0;
if(uidMap.containsKey(buid)) {
count = uidMap.get(buid);
count +=1;
uidMap.put(buid, count);
}else {
uidMap.put(buid, 0);
}
return buid+SEQ_ID.charAt(count);
}
private String parseTime(String time) {
try {
int hour = Integer.parseInt(time.substring(0, 2));
int min = Integer.parseInt(time.substring(2, 4));
boolean halfMin = time.length()>4 && 'H' == time.charAt(4);
return hour+":"+min+(halfMin?":30":":00");
}catch(RuntimeException e) {
log.error("For time \"{}\":{}", time, e.getMessage(), e);
}
return null;
}
private String parseSTime(String time) {
if("H".equals(time))
return "00:00:30";
int min = Integer.parseInt(time.substring(0,1));
boolean half = time.length()>1 && 'H' == time.charAt(1);
return "00:0"+min+(half?":30":":00");
}
private void setTime(PreparedStatement stmt, int pos, String time) throws SQLException{
if(time==null||"".equals(time)) {
stmt.setNull(pos, Types.TIME);
}else {
stmt.setString(pos, parseTime(time));
}
}
private void setSTime(PreparedStatement stmt, int pos, String time) throws SQLException{
if(time==null||"".equals(time)) {
stmt.setNull(pos, Types.TIME);
}else {
stmt.setString(pos, parseSTime(time));
}
}
@Override
public void run() {
log.info("Processing {}", fileName.getName());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
try(
BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(fileName))));
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement sMain = conn.prepareStatement("INSERT INTO n_train_schedule VALUES (?,?,?,?,?,?,?)");
PreparedStatement sDetail = conn.prepareStatement("INSERT INTO n_train_schedule_detail VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
PreparedStatement sError = conn.prepareStatement("INSERT INTO n_train_error VALUES (?,?)");
PreparedStatement sLoca = conn.prepareStatement("INSERT INTO n_train_schedule_location VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
){
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
String trainUID;
JSONObject obj = new JSONObject(line);
sMain.setString(2, obj.optString("CIF_train_uid"));
try {
java.sql.Date startDate = new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime());
java.sql.Date endDate = new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime());
trainUID = getTUID(obj.optString("CIF_train_uid"), startDate, endDate);
sMain.setDate(3, startDate);
sMain.setDate(4, endDate);
} catch (ParseException e1) {
log.error(e1.getMessage(), e1);
continue;
}
sMain.setString(1, trainUID);
sMain.setString(5, obj.optString("schedule_days_runs"));
sMain.setString(6, obj.optString("train_status"));
sMain.setString(7, obj.optString("atoc_code"));
sMain.addBatch();
JSONObject detail = obj.optJSONObject("schedule_segment");
setString(sDetail, 1, trainUID);
setString(sDetail, 2, detail.optString("CIF_train_category"));
setString(sDetail, 3, detail.optString("signalling_id"));
setString(sDetail, 4, detail.optString("CIF_headcode"));
setString(sDetail, 5, detail.optString("CIF_course_indicator"));
setString(sDetail, 6, detail.optString("CIF_train_service_code"));
setString(sDetail, 7, detail.optString("CIF_business_sector"));
setString(sDetail, 8, detail.optString("CIF_power_type"));
setString(sDetail, 9, detail.optString("CIF_timing_load"));
setInt (sDetail,10, detail.optString("CIF_speed"));
setString(sDetail,11, detail.optString("CIF_operating_characteristics"));
setString(sDetail,12, detail.optString("CIF_sleepers"));
setString(sDetail,13, detail.optString("CIF_reservations"));
setString(sDetail,14, detail.optString("CIF_connection_indicator"));
setString(sDetail,15, detail.optString("CIF_catering_code"));
setString(sDetail,16, detail.optString("CIF_service_branding"));
sDetail.addBatch();
JSONArray locations = detail.optJSONArray("schedule_location");
if(locations!=null) {
for(int i=0;i<locations.length();i++) {
JSONObject location = locations.getJSONObject(i);
String type = location.getString("location_type");
if("LO".equals(type) || "LI".equals(type) || "LT".equals(type)) {
setString(sLoca, 1, trainUID);
sLoca.setInt(2, i);
setString(sLoca, 3, location.optString("tiploc_code"));
setTime (sLoca, 4, location.optString("arrival"));
setTime (sLoca, 5, location.optString("public_arrival"));
setTime (sLoca, 6, location.optString("departure"));
setTime (sLoca, 7, location.optString("public_departure"));
setTime (sLoca, 8, location.optString("pass"));
setString(sLoca, 9, location.optString("platform"));
setString(sLoca,10, location.optString("line"));
setString(sLoca,11, location.optString("path"));
setSTime (sLoca,12, location.optString("engineering_allowance"));
setSTime (sLoca,13, location.optString("pathing_allowance"));
setSTime (sLoca,14, location.optString("performance_allowance"));
sLoca.addBatch();
}else {
log.fatal("Unknown entry type {}", type);
System.exit(3);
}
}
}else {
sError.setString(1, trainUID);
sError.setString(2, obj.toString());
// sError.addBatch();
// currentSize++;
}
}
sMain.executeBatch();
sDetail.executeBatch();
sError.executeBatch();
sLoca.executeBatch();
conn.commit();
log.info("Batch {} committed.", fileName.getName());
} catch (FileNotFoundException e) {
log.error(e.getMessage(), e);
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
if(!fileName.delete()) {
log.warn("Unable to delete {}", fileName.getName());
}
}
private void setInt(PreparedStatement stmt, int i, String string) throws SQLException{
if(string==null || "".equals(string))
stmt.setNull(i, Types.INTEGER);
else
stmt.setInt(i, Integer.parseInt(string));
}
private void setString(PreparedStatement stmt, int i, String string) throws SQLException{
if(string==null)
stmt.setNull(i, Types.VARCHAR);
else
stmt.setString(i, string);
}
}

77
src/org/leolo/rail/nrd/TimetableHandler.java

@ -0,0 +1,77 @@
package org.leolo.rail.nrd;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
public class TimetableHandler implements Runnable{
private static Logger log = LogManager.getLogger(TimetableHandler.class);
private final Object SYNC_TOKEN = new Object();
private boolean shutdown = false;
private boolean terminated = false;
private Queue<JSONObject> queue = new LinkedList<>();
public TimetableHandler() {
}
public void add(JSONObject obj) {
queue.add(obj);
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdown() {
shutdown = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdownAndWait() {
shutdown();
while(true) {
if(terminated) {
break;
}
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
@Override
public void run() {
while(true) {
if(shutdown && queue.isEmpty()) {
break;
}
//TODO: actual process
if(!queue.isEmpty()) {
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
terminated = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
}

108
src/org/leolo/rail/nrd/TimetableHandler2.java

@ -0,0 +1,108 @@
package org.leolo.rail.nrd;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
public class TimetableHandler2 implements Runnable{
private static Logger log = LogManager.getLogger(TimetableHandler2.class);
private final Object SYNC_TOKEN = new Object();
private boolean shutdown = false;
private boolean terminated = false;
private Queue<JSONObject> queue = new LinkedList<>();
public TimetableHandler2() {
new Thread(this).start();
}
public void add(JSONObject obj) {
queue.add(obj);
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdown() {
shutdown = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdownAndWait() {
shutdown();
while(true) {
if(terminated) {
break;
}
synchronized(SYNC_TOKEN) {
log.debug("Waiting for termination.");
try {
SYNC_TOKEN.notifyAll();
SYNC_TOKEN.wait(1000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
@Override
public void run() {
try(
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("")
){
int currentSize = 0;
int commitCount = 0;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
while(true) {
JSONObject obj = queue.poll();
if(shutdown && (queue.isEmpty()||obj==null)) {
pstmt.executeBatch();
conn.commit();
log.info("Train assoc committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount);
break;
}else if(obj==null) {
continue;
}
//Parse data
pstmt.addBatch();
if(++currentSize>=Constants.BATCH_SIZE) {
pstmt.executeBatch();
conn.commit();
log.info("Train assoc committed {} entry. Commit #{}", currentSize, ++commitCount);
currentSize = 0;
}
if(queue.isEmpty()) {
log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
}catch(SQLException e) {
log.error(e.getMessage(),e);
}
terminated = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
}

153
src/org/leolo/rail/nrd/TiplocHandler.java

@ -0,0 +1,153 @@
package org.leolo.rail.nrd;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
public class TiplocHandler implements Runnable{
private static Logger log = LogManager.getLogger(TiplocHandler.class);
private final Object SYNC_TOKEN = new Object();
private boolean shutdown = false;
private boolean terminated = false;
private Queue<JSONObject> queue = new LinkedList<>();
public TiplocHandler() {
new Thread(this).start();
}
public void add(JSONObject obj) {
if(obj!=null) {
// log.debug("Size {} >>", queue.size());
queue.add(obj);
}else {
log.warn("Trying to add a null object!");
}
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdown() {
shutdown = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void shutdownAndWait() {
shutdown();
while(true) {
log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null);
if(terminated) {
break;
}
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.notifyAll();
SYNC_TOKEN.wait(1000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
private int currentSize = 0;
private int commitCount = 0;
@Override
public void run() {
try(
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO tiploc VALUES (?,?,?,?,?,?)")
){
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
while(true) {
JSONObject obj = queue.poll();
if(shutdown && (queue.isEmpty()||obj==null)) {
pstmt.executeBatch();
conn.commit();
log.info("TIPLOC committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount);
break;
}else if(obj==null) {
log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
log.debug("Woke up.");
continue;
}
synchronized(FileLoader.SYNC_TOKEN) {
FileLoader.SYNC_TOKEN.notifyAll();
}
//Parse data
pstmt.setString(1, obj.optString("tiploc_code"));
if(obj.has("nalco"))
pstmt.setString(2, obj.optString("nalco"));
else
pstmt.setNull(2, Types.VARCHAR);
if(obj.has("stanox"))
pstmt.setString(3, obj.optString("stanox"));
else
pstmt.setNull(3, Types.VARCHAR);
if(obj.has("crs_code"))
pstmt.setString(4, obj.optString("crs_code"));
else
pstmt.setNull(4, Types.VARCHAR);
if(obj.has("description"))
pstmt.setString(5, obj.optString("description"));
else
pstmt.setNull(5, Types.VARCHAR);
if(obj.has("tps_description"))
pstmt.setString(6, obj.optString("tps_description"));
else
pstmt.setNull(6, Types.VARCHAR);
pstmt.addBatch();
if(++currentSize>=Constants.BATCH_SIZE) {
pstmt.executeBatch();
conn.commit();
log.info("TIPLOC committed {} entry. Commit #{}", currentSize, ++commitCount);
currentSize = 0;
}
if(queue.isEmpty()) {
// log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
}catch(SQLException e) {
log.error(e.getMessage(),e);
}
terminated = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public int getQueueSize() {
return queue.size();
}
}

85
src/org/leolo/rail/nrd/TiplocProcessor.java

@ -0,0 +1,85 @@
package org.leolo.rail.nrd;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.zip.GZIPInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
public class TiplocProcessor implements Runnable {
private File fileName;
private Logger log = LogManager.getLogger(getClass());
TiplocProcessor(File fileName){
this.fileName = fileName;
}
@Override
public void run() {
log.info("Processing {}", fileName.getName());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
try(
BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(fileName))));
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO n_tiploc VALUES (?,?,?,?,?,?)")
){
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
JSONObject obj = new JSONObject(line);
pstmt.setString(1, obj.optString("tiploc_code"));
if(obj.has("nalco"))
pstmt.setString(2, obj.optString("nalco"));
else
pstmt.setNull(2, Types.VARCHAR);
if(obj.has("stanox"))
pstmt.setString(3, obj.optString("stanox"));
else
pstmt.setNull(3, Types.VARCHAR);
if(obj.has("crs_code"))
pstmt.setString(4, obj.optString("crs_code"));
else
pstmt.setNull(4, Types.VARCHAR);
if(obj.has("description"))
pstmt.setString(5, obj.optString("description"));
else
pstmt.setNull(5, Types.VARCHAR);
if(obj.has("tps_description"))
pstmt.setString(6, obj.optString("tps_description"));
else
pstmt.setNull(6, Types.VARCHAR);
pstmt.addBatch();
}
pstmt.executeBatch();
conn.commit();
log.info("Batch {} committed.", fileName.getName());
} catch (FileNotFoundException e) {
log.error(e.getMessage(), e);
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
if(!fileName.delete()) {
log.warn("Unable to delete {}", fileName.getName());
}
}
}

88
src/org/leolo/rail/nrd/db.sql

@ -0,0 +1,88 @@
DROP TABLE IF EXISTS `n_tiploc`;
CREATE TABLE IF NOT EXISTS `n_tiploc` (
`tiploc_code` varchar(8) NOT NULL,
`nalco` varchar(6) DEFAULT NULL,
`stanox` varchar(5) DEFAULT NULL,
`crs` varchar(3) DEFAULT NULL,
`description` varchar(100) DEFAULT NULL,
`tps_desc` varchar(100) DEFAULT NULL,
PRIMARY KEY (`tiploc_code`),
KEY `nalco` (`nalco`),
KEY `stanox` (`stanox`),
KEY `crs` (`crs`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
DROP TABLE IF EXISTS `n_train_assoc`;
CREATE TABLE IF NOT EXISTS `n_train_assoc` (
`assoc_id` int(11) NOT NULL AUTO_INCREMENT,
`main_train_uid` char(8) NOT NULL,
`assoc_train_uid` char(8) NOT NULL,
`start_date` date NOT NULL,
`end_date` date NOT NULL,
`days` char(7) NOT NULL DEFAULT '111111',
`assoc_cat` char(2) NOT NULL DEFAULT '',
`date_ind` char(1) NOT NULL DEFAULT '',
`tiploc` char(8) NOT NULL DEFAULT '',
`base_suffix` char(7) DEFAULT '',
`assoc_suffix` char(7) DEFAULT '',
`diagram_type` char(1) NOT NULL DEFAULT '',
`stp_ind` char(1) NOT NULL DEFAULT '',
PRIMARY KEY (`assoc_id`),
KEY `main_train_uid` (`main_train_uid`),
KEY `assoc_train_uid` (`assoc_train_uid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
DROP TABLE IF EXISTS `n_train_error`;
CREATE TABLE IF NOT EXISTS `n_train_error` (
`train_uid` char(8) NOT NULL,
`segment_data` text NOT NULL,
PRIMARY KEY (`train_uid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
DROP TABLE IF EXISTS `n_train_schedule`;
CREATE TABLE IF NOT EXISTS `n_train_schedule` (
`uid` char(12) NOT NULL,
`train_uid` char(6) NOT NULL,
`start_date` date NOT NULL,
`end_date` date NOT NULL,
`days` char(7) NOT NULL DEFAULT '1111111',
`status` char(10) NOT NULL,
`atoc` char(2) NOT NULL,
PRIMARY KEY (`uid`) USING BTREE,
KEY `train_uid` (`train_uid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
DROP TABLE IF EXISTS `n_train_schedule_detail`;
CREATE TABLE IF NOT EXISTS `n_train_schedule_detail` (
`uid` char(12) NOT NULL,
`category` char(2) DEFAULT NULL,
`signal_id` char(4) DEFAULT NULL,
`headcode` char(4) DEFAULT NULL,
`course_ind` char(1) DEFAULT NULL,
`service_code` char(8) DEFAULT NULL,
`bus_sector` char(2) DEFAULT NULL,
`power_type` char(3) DEFAULT NULL,
`timing_load` char(4) DEFAULT NULL,
`speed` mediumint(8) unsigned DEFAULT NULL,
`op_chars` char(6) DEFAULT NULL,
`sleeper` char(1) DEFAULT NULL,
`resv` char(1) DEFAULT NULL,
`conn_ind` char(1) DEFAULT NULL,
`catering` char(4) DEFAULT NULL,
`branding` char(4) DEFAULT NULL,
PRIMARY KEY (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
DROP TABLE IF EXISTS `n_train_schedule_location`;
CREATE TABLE IF NOT EXISTS `n_train_schedule_location` (
`tuid` char(12) NOT NULL,
`seq` int(10) unsigned NOT NULL,
`tiploc` char(8) DEFAULT NULL,
`arrival` time DEFAULT NULL,
`pub_arrival` time DEFAULT NULL,
`departure` time DEFAULT NULL,
`pub_departure` time DEFAULT NULL,
`pass` time DEFAULT NULL,
`platform` char(3) DEFAULT NULL,
`line` char(3) DEFAULT NULL,
`path` char(3) DEFAULT NULL,
`eng_allowance` time DEFAULT NULL,
`path_allowance` time DEFAULT NULL,
`perf_allowance` time DEFAULT NULL,
PRIMARY KEY (`tuid`,`seq`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Loading…
Cancel
Save