Compare commits
7 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
c39193ec3d | 4 years ago |
|
|
f49d900fdc | 4 years ago |
|
|
76b976e755 | 4 years ago |
|
|
c55082f738 | 4 years ago |
|
|
ba581a5ff7 | 4 years ago |
|
|
35c7e72c49 | 4 years ago |
|
|
e1ae287358 | 4 years ago |
16 changed files with 1723 additions and 0 deletions
@ -1,3 +1,4 @@ |
|||||||
/bin/ |
/bin/ |
||||||
/target/ |
/target/ |
||||||
/configuration.properties |
/configuration.properties |
||||||
|
output.file |
||||||
|
|||||||
@ -0,0 +1,14 @@ |
|||||||
|
<?xml version="1.0" encoding="UTF-8"?> |
||||||
|
<Configuration status="warn"> |
||||||
|
<Appenders> |
||||||
|
<Console name="Console" target="SYSTEM_OUT"> |
||||||
|
<PatternLayout |
||||||
|
pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> |
||||||
|
</Console> |
||||||
|
</Appenders> |
||||||
|
<Loggers> |
||||||
|
<Root level="debug"> |
||||||
|
<AppenderRef ref="Console" /> |
||||||
|
</Root> |
||||||
|
</Loggers> |
||||||
|
</Configuration> |
||||||
@ -0,0 +1,144 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.PreparedStatement; |
||||||
|
import java.sql.SQLException; |
||||||
|
import java.sql.Types; |
||||||
|
import java.text.ParseException; |
||||||
|
import java.text.SimpleDateFormat; |
||||||
|
import java.util.LinkedList; |
||||||
|
import java.util.Queue; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.json.JSONObject; |
||||||
|
|
||||||
|
public class AssoicationHandler implements Runnable{ |
||||||
|
private static Logger log = LogManager.getLogger(AssoicationHandler.class); |
||||||
|
|
||||||
|
private final Object SYNC_TOKEN = new Object(); |
||||||
|
private boolean shutdown = false; |
||||||
|
private boolean terminated = false; |
||||||
|
private Queue<JSONObject> queue = new LinkedList<>(); |
||||||
|
|
||||||
|
|
||||||
|
public AssoicationHandler() { |
||||||
|
new Thread(this).start(); |
||||||
|
} |
||||||
|
|
||||||
|
public void add(JSONObject obj) { |
||||||
|
if(obj!=null) { |
||||||
|
queue.add(obj); |
||||||
|
}else { |
||||||
|
log.warn("Trying to add a null object!"); |
||||||
|
} |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdown() { |
||||||
|
shutdown = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdownAndWait() { |
||||||
|
shutdown(); |
||||||
|
while(true) { |
||||||
|
log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null); |
||||||
|
|
||||||
|
if(terminated) { |
||||||
|
break; |
||||||
|
} |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
SYNC_TOKEN.wait(1000); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private int currentSize = 0; |
||||||
|
private int commitCount = 0; |
||||||
|
@Override |
||||||
|
public void run() { |
||||||
|
try( |
||||||
|
Connection conn = DatabaseManager.getInstance().getConnection(); |
||||||
|
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)") |
||||||
|
){ |
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); |
||||||
|
while(true) { |
||||||
|
JSONObject obj = queue.poll(); |
||||||
|
if(shutdown && (queue.isEmpty()||obj==null)) { |
||||||
|
pstmt.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("Train assoc committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); |
||||||
|
break; |
||||||
|
}else if(obj==null) { |
||||||
|
continue; |
||||||
|
} |
||||||
|
synchronized(FileLoader.SYNC_TOKEN) { |
||||||
|
FileLoader.SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
// log.info("Assoc {}-{}@{}",obj.optString("main_train_uid"),obj.optString("assoc_train_uid"), obj.optString("location"));
|
||||||
|
pstmt.setString(1, obj.optString("main_train_uid")); |
||||||
|
pstmt.setString(2, obj.optString("assoc_train_uid")); |
||||||
|
try { |
||||||
|
pstmt.setDate(3, new java.sql.Date(sdf.parse(obj.optString("assoc_start_date")).getTime())); |
||||||
|
pstmt.setDate(4, new java.sql.Date(sdf.parse(obj.optString("assoc_end_date")).getTime())); |
||||||
|
} catch (ParseException e1) { |
||||||
|
log.warn("Unable to parse date! {}", e1.getMessage(), e1); |
||||||
|
continue; |
||||||
|
} |
||||||
|
pstmt.setString(5, obj.optString("assoc_days")); |
||||||
|
pstmt.setString(6, obj.optString("category")); |
||||||
|
pstmt.setString(7, obj.optString("date_indicator")); |
||||||
|
pstmt.setString(8, obj.optString("location")); |
||||||
|
if(obj.optString("base_location_suffix")!=null) |
||||||
|
pstmt.setString(9, obj.optString("base_location_suffix")); |
||||||
|
else |
||||||
|
pstmt.setNull(9, Types.VARCHAR); |
||||||
|
if(obj.optString("assoc_location_suffix")!=null) |
||||||
|
pstmt.setString(10, obj.optString("assoc_location_suffix")); |
||||||
|
else |
||||||
|
pstmt.setNull(10, Types.VARCHAR); |
||||||
|
pstmt.setString(11, obj.optString("diagram_type")); |
||||||
|
pstmt.setString(12, obj.optString("CIF_stp_indicator")); |
||||||
|
pstmt.addBatch(); |
||||||
|
if(++currentSize>=Constants.BATCH_SIZE) { |
||||||
|
pstmt.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("Train assoc committed {} entry. Commit #{}", currentSize, ++commitCount); |
||||||
|
currentSize = 0; |
||||||
|
} |
||||||
|
if(queue.isEmpty()) { |
||||||
|
log.debug("Empty queue. Wait for more entries"); |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.wait(); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
}catch(SQLException e) { |
||||||
|
log.error(e.getMessage(),e); |
||||||
|
} |
||||||
|
terminated = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
public int getQueueSize() { |
||||||
|
return queue.size(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,86 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.io.BufferedReader; |
||||||
|
import java.io.File; |
||||||
|
import java.io.FileInputStream; |
||||||
|
import java.io.FileNotFoundException; |
||||||
|
import java.io.FileReader; |
||||||
|
import java.io.IOException; |
||||||
|
import java.io.InputStreamReader; |
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.PreparedStatement; |
||||||
|
import java.sql.SQLException; |
||||||
|
import java.sql.Types; |
||||||
|
import java.text.ParseException; |
||||||
|
import java.text.SimpleDateFormat; |
||||||
|
import java.util.zip.GZIPInputStream; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.json.JSONObject; |
||||||
|
|
||||||
|
public class AssoicationProcessor implements Runnable { |
||||||
|
|
||||||
|
private File fileName; |
||||||
|
private Logger log = LogManager.getLogger(getClass()); |
||||||
|
|
||||||
|
AssoicationProcessor(File fileName){ |
||||||
|
this.fileName = fileName; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void run() { |
||||||
|
log.info("Processing {}", fileName.getName()); |
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); |
||||||
|
try( |
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(fileName)))); |
||||||
|
Connection conn = DatabaseManager.getInstance().getConnection(); |
||||||
|
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO n_train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)") |
||||||
|
){ |
||||||
|
while(true) { |
||||||
|
String line = br.readLine(); |
||||||
|
if(line==null) { |
||||||
|
break; |
||||||
|
} |
||||||
|
JSONObject obj = new JSONObject(line); |
||||||
|
pstmt.setString(1, obj.optString("main_train_uid")); |
||||||
|
pstmt.setString(2, obj.optString("assoc_train_uid")); |
||||||
|
try { |
||||||
|
pstmt.setDate(3, new java.sql.Date(sdf.parse(obj.optString("assoc_start_date")).getTime())); |
||||||
|
pstmt.setDate(4, new java.sql.Date(sdf.parse(obj.optString("assoc_end_date")).getTime())); |
||||||
|
} catch (ParseException e1) { |
||||||
|
log.warn("Unable to parse date! {}", e1.getMessage(), e1); |
||||||
|
continue; |
||||||
|
} |
||||||
|
pstmt.setString(5, obj.optString("assoc_days")); |
||||||
|
pstmt.setString(6, obj.optString("category")); |
||||||
|
pstmt.setString(7, obj.optString("date_indicator")); |
||||||
|
pstmt.setString(8, obj.optString("location")); |
||||||
|
if(obj.optString("base_location_suffix")!=null) |
||||||
|
pstmt.setString(9, obj.optString("base_location_suffix")); |
||||||
|
else |
||||||
|
pstmt.setNull(9, Types.VARCHAR); |
||||||
|
if(obj.optString("assoc_location_suffix")!=null) |
||||||
|
pstmt.setString(10, obj.optString("assoc_location_suffix")); |
||||||
|
else |
||||||
|
pstmt.setNull(10, Types.VARCHAR); |
||||||
|
pstmt.setString(11, obj.optString("diagram_type")); |
||||||
|
pstmt.setString(12, obj.optString("CIF_stp_indicator")); |
||||||
|
pstmt.addBatch(); |
||||||
|
} |
||||||
|
pstmt.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("Batch {} committed.", fileName.getName()); |
||||||
|
} catch (FileNotFoundException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} catch (IOException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} catch (SQLException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
if(!fileName.delete()) { |
||||||
|
log.warn("Unable to delete {}", fileName.getName()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,63 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.io.FileReader; |
||||||
|
import java.io.IOException; |
||||||
|
import java.util.Properties; |
||||||
|
import java.util.function.BiConsumer; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
|
||||||
|
public class ConfigurationManager { |
||||||
|
private static Logger log = LogManager.getLogger(ConfigurationManager.class); |
||||||
|
|
||||||
|
private static ConfigurationManager instance; |
||||||
|
private Properties prop = new Properties(); |
||||||
|
|
||||||
|
public synchronized static ConfigurationManager getInstance() { |
||||||
|
if(instance==null) { |
||||||
|
instance = new ConfigurationManager(); |
||||||
|
} |
||||||
|
return instance; |
||||||
|
} |
||||||
|
|
||||||
|
private ConfigurationManager() { |
||||||
|
try(FileReader fr = new FileReader("configuration.properties")){ |
||||||
|
log.debug("Loading properties file"); |
||||||
|
prop.load(fr); |
||||||
|
log.info("{} entries loaded", prop.size()); |
||||||
|
}catch(IOException e) { |
||||||
|
log.fatal(e.getMessage(), e); |
||||||
|
System.exit(1); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
public void forEach(BiConsumer<? super Object, ? super Object> action) { |
||||||
|
prop.forEach(action); |
||||||
|
} |
||||||
|
|
||||||
|
public Object get(Object key) { |
||||||
|
return prop.get(key); |
||||||
|
} |
||||||
|
|
||||||
|
public Object getOrDefault(Object key, Object defaultValue) { |
||||||
|
return prop.getOrDefault(key, defaultValue); |
||||||
|
} |
||||||
|
|
||||||
|
public String getProperty(String key, String defaultValue) { |
||||||
|
return prop.getProperty(key, defaultValue); |
||||||
|
} |
||||||
|
|
||||||
|
public String getProperty(String key) { |
||||||
|
return prop.getProperty(key); |
||||||
|
} |
||||||
|
|
||||||
|
public int size() { |
||||||
|
return prop.size(); |
||||||
|
} |
||||||
|
|
||||||
|
public boolean containsKey(Object key) { |
||||||
|
return prop.containsKey(key); |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,8 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
public class Constants { |
||||||
|
public static final int BATCH_SIZE = 1000; |
||||||
|
|
||||||
|
@Deprecated |
||||||
|
public static final int MAX_QUEUE_SIZE = 40000; |
||||||
|
} |
||||||
@ -0,0 +1,102 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.ResultSet; |
||||||
|
import java.sql.SQLException; |
||||||
|
import java.sql.Statement; |
||||||
|
|
||||||
|
import javax.sql.XAConnection; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.mariadb.jdbc.MariaDbPoolDataSource; |
||||||
|
|
||||||
|
public class DatabaseManager { |
||||||
|
|
||||||
|
private static Logger log = LogManager.getLogger(DatabaseManager.class); |
||||||
|
|
||||||
|
private static DatabaseManager instance; |
||||||
|
|
||||||
|
private MariaDbPoolDataSource ds; |
||||||
|
|
||||||
|
public synchronized static DatabaseManager getInstance() { |
||||||
|
if(instance==null) { |
||||||
|
instance = new DatabaseManager(); |
||||||
|
} |
||||||
|
return instance; |
||||||
|
} |
||||||
|
|
||||||
|
private DatabaseManager() { |
||||||
|
ConfigurationManager cm = ConfigurationManager.getInstance(); |
||||||
|
if( |
||||||
|
!cm.containsKey("db.host")|| |
||||||
|
!cm.containsKey("db.user")|| |
||||||
|
!cm.containsKey("db.pwd")|| |
||||||
|
!cm.containsKey("db.name") |
||||||
|
) { |
||||||
|
log.fatal("Missing required property"); |
||||||
|
System.exit(1); |
||||||
|
} |
||||||
|
String url = "jdbc:mariadb://"+cm.getProperty("db.host")+ |
||||||
|
":"+cm.getProperty("db.port", "3306")+ |
||||||
|
"/"+cm.getProperty("db.name"); |
||||||
|
log.info("Connecting to DB {} as {}", url, cm.get("db.user")); |
||||||
|
try { |
||||||
|
ds = new MariaDbPoolDataSource(url); |
||||||
|
ds.setMaxPoolSize(Integer.parseInt(cm.getOrDefault("db.poolsize", "20").toString())); |
||||||
|
ds.setUser(cm.getProperty("db.user").toString()); |
||||||
|
ds.setPassword(cm.getProperty("db.pwd").toString()); |
||||||
|
} catch (SQLException e) { |
||||||
|
log.fatal("Cannot connect to DB",e); |
||||||
|
System.exit(-2); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public boolean testPool() { |
||||||
|
try(Connection conn = ds.getConnection()){ |
||||||
|
try(Statement stmt = conn.createStatement()){ |
||||||
|
try (ResultSet rs = stmt.executeQuery("SELECT CONNECTION_ID()")){ |
||||||
|
if(rs.next()) { |
||||||
|
log.debug("Connection ID: {}", rs.getString(1)); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} catch (SQLException e) { |
||||||
|
log.warn("Exception when testing the connection., e"); |
||||||
|
return false; |
||||||
|
} |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
public Connection getConnection() throws SQLException { |
||||||
|
Connection conn = ds.getConnection(); |
||||||
|
conn.setAutoCommit(false); |
||||||
|
return conn; |
||||||
|
} |
||||||
|
|
||||||
|
public XAConnection getXAConnection() throws SQLException { |
||||||
|
XAConnection conn = ds.getXAConnection(); |
||||||
|
return conn; |
||||||
|
} |
||||||
|
|
||||||
|
@Deprecated |
||||||
|
public void clear() { |
||||||
|
try( |
||||||
|
Connection conn = getConnection(); |
||||||
|
Statement stmt = conn.createStatement(); |
||||||
|
){ |
||||||
|
stmt.execute("TRUNCATE TABLE train_assoc"); |
||||||
|
stmt.execute("TRUNCATE TABLE tiploc"); |
||||||
|
stmt.execute("TRUNCATE TABLE train_schedule"); |
||||||
|
stmt.execute("TRUNCATE TABLE train_schedule_detail"); |
||||||
|
stmt.execute("TRUNCATE TABLE train_schedule_location"); |
||||||
|
stmt.execute("TRUNCATE TABLE train_error"); |
||||||
|
}catch(SQLException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdown() { |
||||||
|
ds.close(); |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,265 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.io.BufferedReader; |
||||||
|
import java.io.File; |
||||||
|
import java.io.FileNotFoundException; |
||||||
|
import java.io.FileOutputStream; |
||||||
|
import java.io.FileReader; |
||||||
|
import java.io.FileWriter; |
||||||
|
import java.io.IOException; |
||||||
|
import java.io.InputStream; |
||||||
|
import java.io.InputStreamReader; |
||||||
|
import java.io.OutputStreamWriter; |
||||||
|
import java.io.PrintWriter; |
||||||
|
import java.net.URL; |
||||||
|
import java.net.URLConnection; |
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.SQLException; |
||||||
|
import java.sql.Statement; |
||||||
|
import java.util.Base64; |
||||||
|
import java.util.concurrent.ExecutorService; |
||||||
|
import java.util.concurrent.TimeUnit; |
||||||
|
import java.util.zip.GZIPInputStream; |
||||||
|
import java.util.zip.GZIPOutputStream; |
||||||
|
import java.util.zip.ZipException; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.json.JSONObject; |
||||||
|
|
||||||
|
public class FileLoader { |
||||||
|
|
||||||
|
private static Logger log = LogManager.getLogger(FileLoader.class); |
||||||
|
|
||||||
|
public static final Object SYNC_TOKEN = new Object(); |
||||||
|
|
||||||
|
public static void main(String [] args) { |
||||||
|
new FileLoader().run(); |
||||||
|
} |
||||||
|
|
||||||
|
public void run() { |
||||||
|
log.info("Process started"); |
||||||
|
ConfigurationManager.getInstance().forEach((k,v)->{ |
||||||
|
if(!((String)k).endsWith(".pwd")) { |
||||||
|
log.debug("{} -> {}", k, v); |
||||||
|
}else { |
||||||
|
log.debug("{} -> ****", k); |
||||||
|
} |
||||||
|
}); |
||||||
|
if(DatabaseManager.getInstance().testPool()) { |
||||||
|
log.info("Successfully connected to the database"); |
||||||
|
} |
||||||
|
if(!ConfigurationManager.getInstance().containsKey("file.path")) { |
||||||
|
log.fatal("Cannot find file path"); |
||||||
|
System.exit(1); |
||||||
|
} |
||||||
|
InputStream fis = null; |
||||||
|
//TODO: get the file from Network Rail
|
||||||
|
try { |
||||||
|
URL url = new URL(ConfigurationManager.getInstance().getProperty("remote.path")); |
||||||
|
URLConnection conn = url.openConnection(); |
||||||
|
String userpwd = ConfigurationManager.getInstance().getProperty("remote.user")+":"+ConfigurationManager.getInstance().getProperty("remote.pwd"); |
||||||
|
conn.addRequestProperty("Authorization", "Basic "+Base64.getEncoder().encodeToString(userpwd.getBytes())); |
||||||
|
conn.connect(); |
||||||
|
fis = conn.getInputStream(); |
||||||
|
}catch(IOException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
System.exit(4); |
||||||
|
} |
||||||
|
log.info("Connected!"); |
||||||
|
File tempDir = new File(ConfigurationManager.getInstance().getProperty("file.temp_dir", ".")+"/nrdg"); |
||||||
|
if(!tempDir.exists()) { |
||||||
|
tempDir.mkdirs(); |
||||||
|
} |
||||||
|
// DatabaseManager.getInstance().clear();
|
||||||
|
prepDB(); |
||||||
|
int countA=0, countT=0, countS=0; |
||||||
|
int batchA=0, batchT=0, batchS=0; |
||||||
|
try( |
||||||
|
GZIPInputStream gis = new GZIPInputStream(fis); |
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(gis)); |
||||||
|
){ |
||||||
|
PrintWriter ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_0")))); |
||||||
|
PrintWriter tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_0")))); |
||||||
|
PrintWriter sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_0")))); |
||||||
|
int count = 0; |
||||||
|
while(true) { |
||||||
|
String line = br.readLine(); |
||||||
|
if(line==null) { |
||||||
|
break; |
||||||
|
} |
||||||
|
count++; |
||||||
|
JSONObject obj = new JSONObject(line); |
||||||
|
String objectType = obj.keys().next(); |
||||||
|
if("JsonTimetableV1".equals(objectType)) { |
||||||
|
|
||||||
|
}else if("JsonAssociationV1".equals(objectType)){ |
||||||
|
ass.println(obj.getJSONObject("JsonAssociationV1")); |
||||||
|
countA++; |
||||||
|
if(countA%Constants.BATCH_SIZE==0) { |
||||||
|
ass.close(); |
||||||
|
if(batchA%50==0) |
||||||
|
log.info("Created batch for assoication"); |
||||||
|
ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_"+(++batchA))))); |
||||||
|
} |
||||||
|
}else if("TiplocV1".equals(objectType)){ |
||||||
|
tip.println(obj.getJSONObject("TiplocV1")); |
||||||
|
countT++; |
||||||
|
if(countT%Constants.BATCH_SIZE==0) { |
||||||
|
tip.close(); |
||||||
|
if(batchT%50==0) |
||||||
|
log.info("Created batch for TIPLOC"); |
||||||
|
tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_"+(++batchT))))); |
||||||
|
} |
||||||
|
}else if("JsonScheduleV1".equals(objectType)){ |
||||||
|
sch.println(obj.getJSONObject("JsonScheduleV1")); |
||||||
|
countS++; |
||||||
|
if(countS%Constants.BATCH_SIZE==0) { |
||||||
|
sch.close(); |
||||||
|
if(batchS%50==0) |
||||||
|
log.info("Created batch for schedule"); |
||||||
|
sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_"+(++batchS))))); |
||||||
|
} |
||||||
|
}else if("EOF".equals(objectType)){ |
||||||
|
//Nothing to do
|
||||||
|
}else { |
||||||
|
log.fatal("Unhandled type {}", objectType); |
||||||
|
System.exit(2); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
ass.close(); |
||||||
|
tip.close(); |
||||||
|
sch.close(); |
||||||
|
|
||||||
|
log.info("Created batch for assoication"); |
||||||
|
log.info("Created batch for TIPLOC"); |
||||||
|
log.info("Created batch for schedule"); |
||||||
|
log.info("Total count : {}", count); |
||||||
|
}catch(ZipException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
captureFile(); |
||||||
|
}catch(IOException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
System.exit(1); |
||||||
|
} |
||||||
|
try { |
||||||
|
fis.close(); |
||||||
|
}catch(IOException e) { |
||||||
|
log.fatal(e.getMessage(), e); |
||||||
|
System.exit(1); |
||||||
|
} |
||||||
|
log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS); |
||||||
|
ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(Integer.parseInt(ConfigurationManager.getInstance().getProperty("thread", "20"))); |
||||||
|
for(int i=0;i<=batchA;i++) { |
||||||
|
if(i%50==0) |
||||||
|
log.info("Queued Assoication - {}", i); |
||||||
|
threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i))); |
||||||
|
} |
||||||
|
for(int i=0;i<=batchT;i++) { |
||||||
|
if(i%50==0) |
||||||
|
log.info("Queued Tiploc - {}", i); |
||||||
|
threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i))); |
||||||
|
} |
||||||
|
for(int i=0;i<=batchS;i++) { |
||||||
|
if(i%50==0) |
||||||
|
log.info("Queued Schedule - {}", i); |
||||||
|
threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i))); |
||||||
|
} |
||||||
|
log.info("All entry queued."); |
||||||
|
threadPool.shutdown(); |
||||||
|
try { |
||||||
|
threadPool.awaitTermination(3, TimeUnit.HOURS); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
if(!threadPool.isTerminated()) { |
||||||
|
log.error("Some job cannot be finished!"); |
||||||
|
System.exit(50); |
||||||
|
} |
||||||
|
finalizeDB(); |
||||||
|
DatabaseManager.getInstance().shutdown(); |
||||||
|
if(!tempDir.delete()) { |
||||||
|
log.warn("Unable to remove temp dir!"); |
||||||
|
} |
||||||
|
log.info("Job finished!"); |
||||||
|
} |
||||||
|
|
||||||
|
private static final String [] TABLES = { |
||||||
|
"tiploc","train_assoc","train_schedule", |
||||||
|
"train_schedule_detail","train_schedule_location","train_error", |
||||||
|
}; |
||||||
|
|
||||||
|
private void finalizeDB() { |
||||||
|
try( |
||||||
|
Connection conn = DatabaseManager.getInstance().getConnection(); |
||||||
|
Statement stmt = conn.createStatement(); |
||||||
|
){ |
||||||
|
for(String table:TABLES) { |
||||||
|
log.info("Handling {}", table); |
||||||
|
stmt.execute("DROP TABLE IF EXISTS o_"+table); |
||||||
|
stmt.execute("RENAME TABLE "+table+" TO o_"+table+",n_"+table+" TO "+table); |
||||||
|
} |
||||||
|
log.info("Creating extra index. This may take a while"); |
||||||
|
stmt.execute("ALTER TABLE train_schedule_location ADD INDEX (tiploc) USING BTREE"); |
||||||
|
log.info("Removing backup tables"); |
||||||
|
for(String table:TABLES) { |
||||||
|
stmt.execute("DROP TABLE IF EXISTS o_"+table); |
||||||
|
} |
||||||
|
} catch (SQLException e) { |
||||||
|
log.fatal(e.getMessage(), e); |
||||||
|
System.exit(1); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private void prepDB() { |
||||||
|
log.info("Preparing the database."); |
||||||
|
StringBuilder sqls = new StringBuilder(); |
||||||
|
try(BufferedReader br = new BufferedReader(new InputStreamReader(Thread.currentThread().getContextClassLoader().getResourceAsStream("org/leolo/rail/nrd/db.sql")))){ |
||||||
|
while(true) { |
||||||
|
String line = br.readLine(); |
||||||
|
if(line==null) { |
||||||
|
break; |
||||||
|
} |
||||||
|
sqls.append(line); |
||||||
|
} |
||||||
|
} catch (IOException e) { |
||||||
|
log.fatal(e.getMessage(), e); |
||||||
|
System.exit(1); |
||||||
|
} |
||||||
|
log.info("Done loading SQLs from resource"); |
||||||
|
try( |
||||||
|
Connection conn = DatabaseManager.getInstance().getConnection(); |
||||||
|
Statement stmt = conn.createStatement(); |
||||||
|
){ |
||||||
|
for(String sql:sqls.toString().split(";")) { |
||||||
|
log.debug("SQL: {}", sql); |
||||||
|
stmt.execute(sql); |
||||||
|
} |
||||||
|
} catch (SQLException e) { |
||||||
|
log.fatal(e.getMessage(), e); |
||||||
|
System.exit(1); |
||||||
|
} |
||||||
|
log.info("Done prepare the DB"); |
||||||
|
} |
||||||
|
|
||||||
|
private void captureFile() { |
||||||
|
try { |
||||||
|
URL url = new URL(ConfigurationManager.getInstance().get("remote.path").toString()); |
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(url.openStream())); |
||||||
|
PrintWriter out = new PrintWriter("output.file"); |
||||||
|
while(true) { |
||||||
|
String line = br.readLine(); |
||||||
|
if(line==null) { |
||||||
|
break; |
||||||
|
} |
||||||
|
out.println(line); |
||||||
|
} |
||||||
|
out.close(); |
||||||
|
br.close(); |
||||||
|
}catch(IOException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,275 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.Date; |
||||||
|
import java.sql.PreparedStatement; |
||||||
|
import java.sql.SQLException; |
||||||
|
import java.sql.Time; |
||||||
|
import java.sql.Types; |
||||||
|
import java.text.ParseException; |
||||||
|
import java.text.SimpleDateFormat; |
||||||
|
import java.util.Hashtable; |
||||||
|
import java.util.LinkedList; |
||||||
|
import java.util.Map; |
||||||
|
import java.util.Queue; |
||||||
|
import java.util.Random; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.json.JSONArray; |
||||||
|
import org.json.JSONObject; |
||||||
|
|
||||||
|
public class ScheduleHandler implements Runnable{ |
||||||
|
private static Logger log = LogManager.getLogger(ScheduleHandler.class); |
||||||
|
|
||||||
|
private final Object SYNC_TOKEN = new Object(); |
||||||
|
private boolean shutdown = false; |
||||||
|
private boolean terminated = false; |
||||||
|
private Queue<JSONObject> queue = new LinkedList<>(); |
||||||
|
|
||||||
|
|
||||||
|
public ScheduleHandler() { |
||||||
|
new Thread(this).start(); |
||||||
|
} |
||||||
|
|
||||||
|
public void add(JSONObject obj) { |
||||||
|
if(obj!=null) { |
||||||
|
queue.add(obj); |
||||||
|
}else { |
||||||
|
log.warn("Trying to add a null object!"); |
||||||
|
} |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdown() { |
||||||
|
shutdown = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdownAndWait() { |
||||||
|
shutdown(); |
||||||
|
while(true) { |
||||||
|
if(terminated) { |
||||||
|
break; |
||||||
|
} |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null); |
||||||
|
try { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
SYNC_TOKEN.wait(1000); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private int currentSize = 0; |
||||||
|
private int commitCount = 0; |
||||||
|
private int trainCount = 0; |
||||||
|
|
||||||
|
private synchronized String getTUID() { |
||||||
|
String id = Integer.toString(++trainCount, 36); |
||||||
|
StringBuffer sb = new StringBuffer(); |
||||||
|
for(int i = id.length();i<8;i++) { |
||||||
|
sb.append("0"); |
||||||
|
} |
||||||
|
sb.append(id); |
||||||
|
return sb.toString(); |
||||||
|
} |
||||||
|
|
||||||
|
private Time parseTime(String time) { |
||||||
|
try { |
||||||
|
int hour = Integer.parseInt(time.substring(0, 2)); |
||||||
|
int min = Integer.parseInt(time.substring(2, 4)); |
||||||
|
boolean halfMin = time.length()>4 && 'H' == time.charAt(4); |
||||||
|
return new Time(hour*3_600_000+min*60_000+(halfMin?30_000:0)); |
||||||
|
}catch(RuntimeException e) { |
||||||
|
log.error("For time \"{}\":{}", time, e.getMessage(), e); |
||||||
|
} |
||||||
|
return null; |
||||||
|
} |
||||||
|
|
||||||
|
private String parseSTime(String time) { |
||||||
|
if("H".equals(time)) |
||||||
|
return "00:00:30"; |
||||||
|
int min = Integer.parseInt(time.substring(0,1)); |
||||||
|
boolean half = time.length()>1 && 'H' == time.charAt(1); |
||||||
|
return "00:0"+min+(half?":30":":00"); |
||||||
|
} |
||||||
|
|
||||||
|
private void setTime(PreparedStatement stmt, int pos, String time) throws SQLException{ |
||||||
|
if(time==null||"".equals(time)) { |
||||||
|
stmt.setNull(pos, Types.TIME); |
||||||
|
}else { |
||||||
|
stmt.setTime(pos, parseTime(time)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private void setSTime(PreparedStatement stmt, int pos, String time) throws SQLException{ |
||||||
|
if(time==null||"".equals(time)) { |
||||||
|
stmt.setNull(pos, Types.TIME); |
||||||
|
}else { |
||||||
|
stmt.setString(pos, parseSTime(time)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void run() { |
||||||
|
try( |
||||||
|
Connection conn = DatabaseManager.getInstance().getConnection(); |
||||||
|
PreparedStatement sMain = conn.prepareStatement("INSERT INTO train_schedule VALUES (?,?,?,?,?,?,?)"); |
||||||
|
PreparedStatement sDetail = conn.prepareStatement("INSERT INTO train_schedule_detail VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); |
||||||
|
PreparedStatement sError = conn.prepareStatement("INSERT INTO train_error VALUES (?,?)"); |
||||||
|
PreparedStatement sLoca = conn.prepareStatement("INSERT INTO train_schedule_location VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)") |
||||||
|
){ |
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); |
||||||
|
while(true) { |
||||||
|
JSONObject obj = queue.poll(); |
||||||
|
if(shutdown && (queue.isEmpty()||obj==null)) { |
||||||
|
sMain.executeBatch(); |
||||||
|
sDetail.executeBatch(); |
||||||
|
sError.executeBatch(); |
||||||
|
sLoca.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("Train schedule committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); |
||||||
|
break; |
||||||
|
}else if(obj==null) { |
||||||
|
log.debug("Empty queue. Wait for more entries"); |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.wait(); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
log.debug("Woke up."); |
||||||
|
continue; |
||||||
|
} |
||||||
|
synchronized(FileLoader.SYNC_TOKEN) { |
||||||
|
FileLoader.SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
//Parse data
|
||||||
|
String trainUID = getTUID(); |
||||||
|
sMain.setString(1, trainUID); |
||||||
|
sMain.setString(2, obj.optString("CIF_train_uid")); |
||||||
|
try { |
||||||
|
sMain.setDate(3, new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime())); |
||||||
|
sMain.setDate(4, new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime())); |
||||||
|
} catch (ParseException e1) { |
||||||
|
log.error(e1.getMessage(), e1); |
||||||
|
continue; |
||||||
|
} |
||||||
|
sMain.setString(5, obj.optString("schedule_days_runs")); |
||||||
|
sMain.setString(6, obj.optString("train_status")); |
||||||
|
sMain.setString(7, obj.optString("atoc_code")); |
||||||
|
sMain.addBatch(); |
||||||
|
currentSize++; |
||||||
|
JSONObject detail = obj.optJSONObject("schedule_segment"); |
||||||
|
setString(sDetail, 1, trainUID); |
||||||
|
setString(sDetail, 2, detail.optString("CIF_train_category")); |
||||||
|
setString(sDetail, 3, detail.optString("signalling_id")); |
||||||
|
setString(sDetail, 4, detail.optString("CIF_headcode")); |
||||||
|
setString(sDetail, 5, detail.optString("CIF_course_indicator")); |
||||||
|
setString(sDetail, 6, detail.optString("CIF_train_service_code")); |
||||||
|
setString(sDetail, 7, detail.optString("CIF_business_sector")); |
||||||
|
setString(sDetail, 8, detail.optString("CIF_power_type")); |
||||||
|
setString(sDetail, 9, detail.optString("CIF_timing_load")); |
||||||
|
setInt (sDetail,10, detail.optString("CIF_speed")); |
||||||
|
setString(sDetail,11, detail.optString("CIF_operating_characteristics")); |
||||||
|
setString(sDetail,12, detail.optString("CIF_sleepers")); |
||||||
|
setString(sDetail,13, detail.optString("CIF_reservations")); |
||||||
|
setString(sDetail,14, detail.optString("CIF_connection_indicator")); |
||||||
|
setString(sDetail,15, detail.optString("CIF_catering_code")); |
||||||
|
setString(sDetail,16, detail.optString("CIF_service_branding")); |
||||||
|
|
||||||
|
sDetail.addBatch(); |
||||||
|
currentSize++; |
||||||
|
JSONArray locations = detail.optJSONArray("schedule_location"); |
||||||
|
if(locations!=null) { |
||||||
|
for(int i=0;i<locations.length();i++) { |
||||||
|
JSONObject location = locations.getJSONObject(i); |
||||||
|
String type = location.getString("location_type"); |
||||||
|
if("LO".equals(type) || "LI".equals(type) || "LT".equals(type)) { |
||||||
|
setString(sLoca, 1, trainUID); |
||||||
|
sLoca.setInt(2, i); |
||||||
|
setString(sLoca, 3, location.optString("tiploc_code")); |
||||||
|
setTime (sLoca, 4, location.optString("arrival")); |
||||||
|
setTime (sLoca, 5, location.optString("public_arrival")); |
||||||
|
setTime (sLoca, 6, location.optString("departure")); |
||||||
|
setTime (sLoca, 7, location.optString("public_departure")); |
||||||
|
setTime (sLoca, 8, location.optString("pass")); |
||||||
|
setString(sLoca, 9, location.optString("platform")); |
||||||
|
setString(sLoca,10, location.optString("line")); |
||||||
|
setString(sLoca,11, location.optString("path")); |
||||||
|
setSTime (sLoca,12, location.optString("engineering_allowance")); |
||||||
|
setSTime (sLoca,13, location.optString("pathing_allowance")); |
||||||
|
setSTime (sLoca,14, location.optString("performance_allowance")); |
||||||
|
sLoca.addBatch(); |
||||||
|
currentSize++; |
||||||
|
}else { |
||||||
|
log.fatal("Unknown entry type {}", type); |
||||||
|
System.exit(3); |
||||||
|
} |
||||||
|
} |
||||||
|
}else { |
||||||
|
sError.setString(1, trainUID); |
||||||
|
sError.setString(2, obj.toString()); |
||||||
|
// sError.addBatch();
|
||||||
|
// currentSize++;
|
||||||
|
} |
||||||
|
if(currentSize>=Constants.BATCH_SIZE) { |
||||||
|
sMain.executeBatch(); |
||||||
|
sDetail.executeBatch(); |
||||||
|
sError.executeBatch(); |
||||||
|
sLoca.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("Train schedule committed {} entry. Commit #{}", currentSize, ++commitCount); |
||||||
|
currentSize = 0; |
||||||
|
} |
||||||
|
if(queue.isEmpty()) { |
||||||
|
// log.debug("Empty queue. Wait for more entries");
|
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.wait(); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
// log.debug("Woke up.");
|
||||||
|
} |
||||||
|
} |
||||||
|
}catch(SQLException e) { |
||||||
|
log.error(e.getMessage(),e); |
||||||
|
} |
||||||
|
terminated = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
private void setInt(PreparedStatement stmt, int i, String string) throws SQLException{ |
||||||
|
if(string==null || "".equals(string)) |
||||||
|
stmt.setNull(i, Types.INTEGER); |
||||||
|
else |
||||||
|
stmt.setInt(i, Integer.parseInt(string)); |
||||||
|
} |
||||||
|
|
||||||
|
private void setString(PreparedStatement stmt, int i, String string) throws SQLException{ |
||||||
|
if(string==null) |
||||||
|
stmt.setNull(i, Types.VARCHAR); |
||||||
|
else |
||||||
|
stmt.setString(i, string); |
||||||
|
} |
||||||
|
|
||||||
|
public int getQueueSize() { |
||||||
|
return queue.size(); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,244 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.io.BufferedReader; |
||||||
|
import java.io.File; |
||||||
|
import java.io.FileInputStream; |
||||||
|
import java.io.FileNotFoundException; |
||||||
|
import java.io.FileReader; |
||||||
|
import java.io.IOException; |
||||||
|
import java.io.InputStreamReader; |
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.PreparedStatement; |
||||||
|
import java.sql.SQLException; |
||||||
|
import java.sql.Time; |
||||||
|
import java.sql.Types; |
||||||
|
import java.text.DateFormat; |
||||||
|
import java.text.FieldPosition; |
||||||
|
import java.text.ParseException; |
||||||
|
import java.text.ParsePosition; |
||||||
|
import java.text.SimpleDateFormat; |
||||||
|
import java.util.Date; |
||||||
|
import java.util.Hashtable; |
||||||
|
import java.util.Random; |
||||||
|
import java.util.zip.GZIPInputStream; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.json.JSONArray; |
||||||
|
import org.json.JSONObject; |
||||||
|
|
||||||
|
public class ScheduleProcessor implements Runnable { |
||||||
|
|
||||||
|
private File fileName; |
||||||
|
private Logger log = LogManager.getLogger(getClass()); |
||||||
|
private static TUIDDateFormat tdf = new TUIDDateFormat(); |
||||||
|
|
||||||
|
public static final String SEQ_ID = "0123456789"; |
||||||
|
private static Random r = new Random(); |
||||||
|
private static class TUIDDateFormat extends DateFormat{ |
||||||
|
|
||||||
|
public static final String MONTH_ID = "MBTQPHSONDUE"; |
||||||
|
public static final String DAY_ID = "0123456789ABCDEFGHJKLMNPRSTUVWX"; |
||||||
|
|
||||||
|
|
||||||
|
@Override |
||||||
|
public StringBuffer format(Date arg0, StringBuffer arg1, FieldPosition arg2) { |
||||||
|
// TODO Auto-generated method stub
|
||||||
|
arg1.append(MONTH_ID.charAt(arg0.getMonth())); |
||||||
|
arg1.append(DAY_ID.charAt(arg0.getDate()-1)); |
||||||
|
return arg1; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Date parse(String arg0, ParsePosition arg1) { |
||||||
|
throw new UnsupportedOperationException(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
ScheduleProcessor(File fileName){ |
||||||
|
this.fileName = fileName; |
||||||
|
} |
||||||
|
private static int trainCount = 0; |
||||||
|
|
||||||
|
private static synchronized String getTUID() { |
||||||
|
String id = Integer.toString(++trainCount, 36); |
||||||
|
StringBuffer sb = new StringBuffer(); |
||||||
|
for(int i = id.length();i<8;i++) { |
||||||
|
sb.append("0"); |
||||||
|
} |
||||||
|
sb.append(id); |
||||||
|
return sb.toString(); |
||||||
|
} |
||||||
|
|
||||||
|
private static Hashtable<String, Integer> uidMap = new Hashtable<>(); |
||||||
|
|
||||||
|
private static synchronized String getTUID(String uid, Date startDate, Date endDate) { |
||||||
|
String buid = uid+tdf.format(startDate)+tdf.format(endDate); |
||||||
|
int count = 0; |
||||||
|
if(uidMap.containsKey(buid)) { |
||||||
|
count = uidMap.get(buid); |
||||||
|
count +=1; |
||||||
|
uidMap.put(buid, count); |
||||||
|
}else { |
||||||
|
uidMap.put(buid, 0); |
||||||
|
} |
||||||
|
return buid+SEQ_ID.charAt(count); |
||||||
|
} |
||||||
|
|
||||||
|
private String parseTime(String time) { |
||||||
|
try { |
||||||
|
int hour = Integer.parseInt(time.substring(0, 2)); |
||||||
|
int min = Integer.parseInt(time.substring(2, 4)); |
||||||
|
boolean halfMin = time.length()>4 && 'H' == time.charAt(4); |
||||||
|
return hour+":"+min+(halfMin?":30":":00"); |
||||||
|
}catch(RuntimeException e) { |
||||||
|
log.error("For time \"{}\":{}", time, e.getMessage(), e); |
||||||
|
} |
||||||
|
return null; |
||||||
|
} |
||||||
|
|
||||||
|
private String parseSTime(String time) { |
||||||
|
if("H".equals(time)) |
||||||
|
return "00:00:30"; |
||||||
|
int min = Integer.parseInt(time.substring(0,1)); |
||||||
|
boolean half = time.length()>1 && 'H' == time.charAt(1); |
||||||
|
return "00:0"+min+(half?":30":":00"); |
||||||
|
} |
||||||
|
|
||||||
|
private void setTime(PreparedStatement stmt, int pos, String time) throws SQLException{ |
||||||
|
if(time==null||"".equals(time)) { |
||||||
|
stmt.setNull(pos, Types.TIME); |
||||||
|
}else { |
||||||
|
stmt.setString(pos, parseTime(time)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private void setSTime(PreparedStatement stmt, int pos, String time) throws SQLException{ |
||||||
|
if(time==null||"".equals(time)) { |
||||||
|
stmt.setNull(pos, Types.TIME); |
||||||
|
}else { |
||||||
|
stmt.setString(pos, parseSTime(time)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void run() { |
||||||
|
log.info("Processing {}", fileName.getName()); |
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); |
||||||
|
try( |
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(fileName)))); |
||||||
|
Connection conn = DatabaseManager.getInstance().getConnection(); |
||||||
|
PreparedStatement sMain = conn.prepareStatement("INSERT INTO n_train_schedule VALUES (?,?,?,?,?,?,?)"); |
||||||
|
PreparedStatement sDetail = conn.prepareStatement("INSERT INTO n_train_schedule_detail VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); |
||||||
|
PreparedStatement sError = conn.prepareStatement("INSERT INTO n_train_error VALUES (?,?)"); |
||||||
|
PreparedStatement sLoca = conn.prepareStatement("INSERT INTO n_train_schedule_location VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)") |
||||||
|
){ |
||||||
|
while(true) { |
||||||
|
String line = br.readLine(); |
||||||
|
if(line==null) { |
||||||
|
break; |
||||||
|
} |
||||||
|
String trainUID; |
||||||
|
JSONObject obj = new JSONObject(line); |
||||||
|
sMain.setString(2, obj.optString("CIF_train_uid")); |
||||||
|
try { |
||||||
|
java.sql.Date startDate = new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime()); |
||||||
|
java.sql.Date endDate = new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime()); |
||||||
|
trainUID = getTUID(obj.optString("CIF_train_uid"), startDate, endDate); |
||||||
|
sMain.setDate(3, startDate); |
||||||
|
sMain.setDate(4, endDate); |
||||||
|
} catch (ParseException e1) { |
||||||
|
log.error(e1.getMessage(), e1); |
||||||
|
continue; |
||||||
|
} |
||||||
|
sMain.setString(1, trainUID); |
||||||
|
sMain.setString(5, obj.optString("schedule_days_runs")); |
||||||
|
sMain.setString(6, obj.optString("train_status")); |
||||||
|
sMain.setString(7, obj.optString("atoc_code")); |
||||||
|
sMain.addBatch(); |
||||||
|
JSONObject detail = obj.optJSONObject("schedule_segment"); |
||||||
|
setString(sDetail, 1, trainUID); |
||||||
|
setString(sDetail, 2, detail.optString("CIF_train_category")); |
||||||
|
setString(sDetail, 3, detail.optString("signalling_id")); |
||||||
|
setString(sDetail, 4, detail.optString("CIF_headcode")); |
||||||
|
setString(sDetail, 5, detail.optString("CIF_course_indicator")); |
||||||
|
setString(sDetail, 6, detail.optString("CIF_train_service_code")); |
||||||
|
setString(sDetail, 7, detail.optString("CIF_business_sector")); |
||||||
|
setString(sDetail, 8, detail.optString("CIF_power_type")); |
||||||
|
setString(sDetail, 9, detail.optString("CIF_timing_load")); |
||||||
|
setInt (sDetail,10, detail.optString("CIF_speed")); |
||||||
|
setString(sDetail,11, detail.optString("CIF_operating_characteristics")); |
||||||
|
setString(sDetail,12, detail.optString("CIF_sleepers")); |
||||||
|
setString(sDetail,13, detail.optString("CIF_reservations")); |
||||||
|
setString(sDetail,14, detail.optString("CIF_connection_indicator")); |
||||||
|
setString(sDetail,15, detail.optString("CIF_catering_code")); |
||||||
|
setString(sDetail,16, detail.optString("CIF_service_branding")); |
||||||
|
|
||||||
|
sDetail.addBatch(); |
||||||
|
JSONArray locations = detail.optJSONArray("schedule_location"); |
||||||
|
if(locations!=null) { |
||||||
|
for(int i=0;i<locations.length();i++) { |
||||||
|
JSONObject location = locations.getJSONObject(i); |
||||||
|
String type = location.getString("location_type"); |
||||||
|
if("LO".equals(type) || "LI".equals(type) || "LT".equals(type)) { |
||||||
|
setString(sLoca, 1, trainUID); |
||||||
|
sLoca.setInt(2, i); |
||||||
|
setString(sLoca, 3, location.optString("tiploc_code")); |
||||||
|
setTime (sLoca, 4, location.optString("arrival")); |
||||||
|
setTime (sLoca, 5, location.optString("public_arrival")); |
||||||
|
setTime (sLoca, 6, location.optString("departure")); |
||||||
|
setTime (sLoca, 7, location.optString("public_departure")); |
||||||
|
setTime (sLoca, 8, location.optString("pass")); |
||||||
|
setString(sLoca, 9, location.optString("platform")); |
||||||
|
setString(sLoca,10, location.optString("line")); |
||||||
|
setString(sLoca,11, location.optString("path")); |
||||||
|
setSTime (sLoca,12, location.optString("engineering_allowance")); |
||||||
|
setSTime (sLoca,13, location.optString("pathing_allowance")); |
||||||
|
setSTime (sLoca,14, location.optString("performance_allowance")); |
||||||
|
sLoca.addBatch(); |
||||||
|
}else { |
||||||
|
log.fatal("Unknown entry type {}", type); |
||||||
|
System.exit(3); |
||||||
|
} |
||||||
|
} |
||||||
|
}else { |
||||||
|
sError.setString(1, trainUID); |
||||||
|
sError.setString(2, obj.toString()); |
||||||
|
// sError.addBatch();
|
||||||
|
// currentSize++;
|
||||||
|
} |
||||||
|
} |
||||||
|
sMain.executeBatch(); |
||||||
|
sDetail.executeBatch(); |
||||||
|
sError.executeBatch(); |
||||||
|
sLoca.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("Batch {} committed.", fileName.getName()); |
||||||
|
} catch (FileNotFoundException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} catch (IOException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} catch (SQLException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
if(!fileName.delete()) { |
||||||
|
log.warn("Unable to delete {}", fileName.getName()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private void setInt(PreparedStatement stmt, int i, String string) throws SQLException{ |
||||||
|
if(string==null || "".equals(string)) |
||||||
|
stmt.setNull(i, Types.INTEGER); |
||||||
|
else |
||||||
|
stmt.setInt(i, Integer.parseInt(string)); |
||||||
|
} |
||||||
|
|
||||||
|
private void setString(PreparedStatement stmt, int i, String string) throws SQLException{ |
||||||
|
if(string==null) |
||||||
|
stmt.setNull(i, Types.VARCHAR); |
||||||
|
else |
||||||
|
stmt.setString(i, string); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,77 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.util.LinkedList; |
||||||
|
import java.util.Queue; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.json.JSONObject; |
||||||
|
|
||||||
|
public class TimetableHandler implements Runnable{ |
||||||
|
private static Logger log = LogManager.getLogger(TimetableHandler.class); |
||||||
|
|
||||||
|
private final Object SYNC_TOKEN = new Object(); |
||||||
|
private boolean shutdown = false; |
||||||
|
private boolean terminated = false; |
||||||
|
private Queue<JSONObject> queue = new LinkedList<>(); |
||||||
|
|
||||||
|
|
||||||
|
public TimetableHandler() { |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
public void add(JSONObject obj) { |
||||||
|
queue.add(obj); |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdown() { |
||||||
|
shutdown = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdownAndWait() { |
||||||
|
shutdown(); |
||||||
|
while(true) { |
||||||
|
if(terminated) { |
||||||
|
break; |
||||||
|
} |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.wait(); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void run() { |
||||||
|
while(true) { |
||||||
|
if(shutdown && queue.isEmpty()) { |
||||||
|
break; |
||||||
|
} |
||||||
|
//TODO: actual process
|
||||||
|
if(!queue.isEmpty()) { |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.wait(); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
terminated = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,108 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.PreparedStatement; |
||||||
|
import java.sql.SQLException; |
||||||
|
import java.text.ParseException; |
||||||
|
import java.text.SimpleDateFormat; |
||||||
|
import java.util.LinkedList; |
||||||
|
import java.util.Queue; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.json.JSONObject; |
||||||
|
|
||||||
|
public class TimetableHandler2 implements Runnable{ |
||||||
|
private static Logger log = LogManager.getLogger(TimetableHandler2.class); |
||||||
|
|
||||||
|
private final Object SYNC_TOKEN = new Object(); |
||||||
|
private boolean shutdown = false; |
||||||
|
private boolean terminated = false; |
||||||
|
private Queue<JSONObject> queue = new LinkedList<>(); |
||||||
|
|
||||||
|
|
||||||
|
public TimetableHandler2() { |
||||||
|
new Thread(this).start(); |
||||||
|
} |
||||||
|
|
||||||
|
public void add(JSONObject obj) { |
||||||
|
queue.add(obj); |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdown() { |
||||||
|
shutdown = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdownAndWait() { |
||||||
|
shutdown(); |
||||||
|
while(true) { |
||||||
|
if(terminated) { |
||||||
|
break; |
||||||
|
} |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
log.debug("Waiting for termination."); |
||||||
|
try { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
SYNC_TOKEN.wait(1000); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void run() { |
||||||
|
try( |
||||||
|
Connection conn = DatabaseManager.getInstance().getConnection(); |
||||||
|
PreparedStatement pstmt = conn.prepareStatement("") |
||||||
|
){ |
||||||
|
int currentSize = 0; |
||||||
|
int commitCount = 0; |
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); |
||||||
|
while(true) { |
||||||
|
JSONObject obj = queue.poll(); |
||||||
|
if(shutdown && (queue.isEmpty()||obj==null)) { |
||||||
|
pstmt.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("Train assoc committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); |
||||||
|
break; |
||||||
|
}else if(obj==null) { |
||||||
|
continue; |
||||||
|
} |
||||||
|
//Parse data
|
||||||
|
pstmt.addBatch(); |
||||||
|
if(++currentSize>=Constants.BATCH_SIZE) { |
||||||
|
pstmt.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("Train assoc committed {} entry. Commit #{}", currentSize, ++commitCount); |
||||||
|
currentSize = 0; |
||||||
|
} |
||||||
|
if(queue.isEmpty()) { |
||||||
|
log.debug("Empty queue. Wait for more entries"); |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.wait(); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
}catch(SQLException e) { |
||||||
|
log.error(e.getMessage(),e); |
||||||
|
} |
||||||
|
terminated = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,153 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.PreparedStatement; |
||||||
|
import java.sql.SQLException; |
||||||
|
import java.sql.Types; |
||||||
|
import java.text.ParseException; |
||||||
|
import java.text.SimpleDateFormat; |
||||||
|
import java.util.LinkedList; |
||||||
|
import java.util.Queue; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.json.JSONObject; |
||||||
|
|
||||||
|
public class TiplocHandler implements Runnable{ |
||||||
|
private static Logger log = LogManager.getLogger(TiplocHandler.class); |
||||||
|
|
||||||
|
private final Object SYNC_TOKEN = new Object(); |
||||||
|
private boolean shutdown = false; |
||||||
|
private boolean terminated = false; |
||||||
|
private Queue<JSONObject> queue = new LinkedList<>(); |
||||||
|
|
||||||
|
|
||||||
|
public TiplocHandler() { |
||||||
|
new Thread(this).start(); |
||||||
|
} |
||||||
|
|
||||||
|
public void add(JSONObject obj) { |
||||||
|
if(obj!=null) { |
||||||
|
// log.debug("Size {} >>", queue.size());
|
||||||
|
queue.add(obj); |
||||||
|
}else { |
||||||
|
log.warn("Trying to add a null object!"); |
||||||
|
} |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdown() { |
||||||
|
shutdown = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public void shutdownAndWait() { |
||||||
|
shutdown(); |
||||||
|
while(true) { |
||||||
|
log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null); |
||||||
|
|
||||||
|
if(terminated) { |
||||||
|
break; |
||||||
|
} |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
SYNC_TOKEN.wait(1000); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
private int currentSize = 0; |
||||||
|
private int commitCount = 0; |
||||||
|
|
||||||
|
@Override |
||||||
|
public void run() { |
||||||
|
try( |
||||||
|
Connection conn = DatabaseManager.getInstance().getConnection(); |
||||||
|
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO tiploc VALUES (?,?,?,?,?,?)") |
||||||
|
){ |
||||||
|
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
|
||||||
|
while(true) { |
||||||
|
JSONObject obj = queue.poll(); |
||||||
|
if(shutdown && (queue.isEmpty()||obj==null)) { |
||||||
|
pstmt.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("TIPLOC committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); |
||||||
|
break; |
||||||
|
}else if(obj==null) { |
||||||
|
log.debug("Empty queue. Wait for more entries"); |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.wait(); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
log.debug("Woke up."); |
||||||
|
continue; |
||||||
|
} |
||||||
|
synchronized(FileLoader.SYNC_TOKEN) { |
||||||
|
FileLoader.SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
//Parse data
|
||||||
|
pstmt.setString(1, obj.optString("tiploc_code")); |
||||||
|
if(obj.has("nalco")) |
||||||
|
pstmt.setString(2, obj.optString("nalco")); |
||||||
|
else |
||||||
|
pstmt.setNull(2, Types.VARCHAR); |
||||||
|
if(obj.has("stanox")) |
||||||
|
pstmt.setString(3, obj.optString("stanox")); |
||||||
|
else |
||||||
|
pstmt.setNull(3, Types.VARCHAR); |
||||||
|
if(obj.has("crs_code")) |
||||||
|
pstmt.setString(4, obj.optString("crs_code")); |
||||||
|
else |
||||||
|
pstmt.setNull(4, Types.VARCHAR); |
||||||
|
if(obj.has("description")) |
||||||
|
pstmt.setString(5, obj.optString("description")); |
||||||
|
else |
||||||
|
pstmt.setNull(5, Types.VARCHAR); |
||||||
|
if(obj.has("tps_description")) |
||||||
|
pstmt.setString(6, obj.optString("tps_description")); |
||||||
|
else |
||||||
|
pstmt.setNull(6, Types.VARCHAR); |
||||||
|
|
||||||
|
pstmt.addBatch(); |
||||||
|
if(++currentSize>=Constants.BATCH_SIZE) { |
||||||
|
pstmt.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("TIPLOC committed {} entry. Commit #{}", currentSize, ++commitCount); |
||||||
|
currentSize = 0; |
||||||
|
} |
||||||
|
if(queue.isEmpty()) { |
||||||
|
// log.debug("Empty queue. Wait for more entries");
|
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
try { |
||||||
|
SYNC_TOKEN.wait(); |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
}catch(SQLException e) { |
||||||
|
log.error(e.getMessage(),e); |
||||||
|
} |
||||||
|
terminated = true; |
||||||
|
synchronized(SYNC_TOKEN) { |
||||||
|
SYNC_TOKEN.notifyAll(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
public int getQueueSize() { |
||||||
|
return queue.size(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,85 @@ |
|||||||
|
package org.leolo.rail.nrd; |
||||||
|
|
||||||
|
import java.io.BufferedReader; |
||||||
|
import java.io.File; |
||||||
|
import java.io.FileInputStream; |
||||||
|
import java.io.FileNotFoundException; |
||||||
|
import java.io.FileReader; |
||||||
|
import java.io.IOException; |
||||||
|
import java.io.InputStreamReader; |
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.PreparedStatement; |
||||||
|
import java.sql.SQLException; |
||||||
|
import java.sql.Types; |
||||||
|
import java.text.ParseException; |
||||||
|
import java.text.SimpleDateFormat; |
||||||
|
import java.util.zip.GZIPInputStream; |
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager; |
||||||
|
import org.apache.logging.log4j.Logger; |
||||||
|
import org.json.JSONObject; |
||||||
|
|
||||||
|
public class TiplocProcessor implements Runnable { |
||||||
|
|
||||||
|
private File fileName; |
||||||
|
private Logger log = LogManager.getLogger(getClass()); |
||||||
|
|
||||||
|
TiplocProcessor(File fileName){ |
||||||
|
this.fileName = fileName; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void run() { |
||||||
|
log.info("Processing {}", fileName.getName()); |
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); |
||||||
|
try( |
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(fileName)))); |
||||||
|
Connection conn = DatabaseManager.getInstance().getConnection(); |
||||||
|
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO n_tiploc VALUES (?,?,?,?,?,?)") |
||||||
|
){ |
||||||
|
while(true) { |
||||||
|
String line = br.readLine(); |
||||||
|
if(line==null) { |
||||||
|
break; |
||||||
|
} |
||||||
|
JSONObject obj = new JSONObject(line); |
||||||
|
pstmt.setString(1, obj.optString("tiploc_code")); |
||||||
|
if(obj.has("nalco")) |
||||||
|
pstmt.setString(2, obj.optString("nalco")); |
||||||
|
else |
||||||
|
pstmt.setNull(2, Types.VARCHAR); |
||||||
|
if(obj.has("stanox")) |
||||||
|
pstmt.setString(3, obj.optString("stanox")); |
||||||
|
else |
||||||
|
pstmt.setNull(3, Types.VARCHAR); |
||||||
|
if(obj.has("crs_code")) |
||||||
|
pstmt.setString(4, obj.optString("crs_code")); |
||||||
|
else |
||||||
|
pstmt.setNull(4, Types.VARCHAR); |
||||||
|
if(obj.has("description")) |
||||||
|
pstmt.setString(5, obj.optString("description")); |
||||||
|
else |
||||||
|
pstmt.setNull(5, Types.VARCHAR); |
||||||
|
if(obj.has("tps_description")) |
||||||
|
pstmt.setString(6, obj.optString("tps_description")); |
||||||
|
else |
||||||
|
pstmt.setNull(6, Types.VARCHAR); |
||||||
|
|
||||||
|
pstmt.addBatch(); |
||||||
|
} |
||||||
|
pstmt.executeBatch(); |
||||||
|
conn.commit(); |
||||||
|
log.info("Batch {} committed.", fileName.getName()); |
||||||
|
} catch (FileNotFoundException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} catch (IOException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} catch (SQLException e) { |
||||||
|
log.error(e.getMessage(), e); |
||||||
|
} |
||||||
|
if(!fileName.delete()) { |
||||||
|
log.warn("Unable to delete {}", fileName.getName()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,88 @@ |
|||||||
|
DROP TABLE IF EXISTS `n_tiploc`; |
||||||
|
CREATE TABLE IF NOT EXISTS `n_tiploc` ( |
||||||
|
`tiploc_code` varchar(8) NOT NULL, |
||||||
|
`nalco` varchar(6) DEFAULT NULL, |
||||||
|
`stanox` varchar(5) DEFAULT NULL, |
||||||
|
`crs` varchar(3) DEFAULT NULL, |
||||||
|
`description` varchar(100) DEFAULT NULL, |
||||||
|
`tps_desc` varchar(100) DEFAULT NULL, |
||||||
|
PRIMARY KEY (`tiploc_code`), |
||||||
|
KEY `nalco` (`nalco`), |
||||||
|
KEY `stanox` (`stanox`), |
||||||
|
KEY `crs` (`crs`) |
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=latin1; |
||||||
|
DROP TABLE IF EXISTS `n_train_assoc`; |
||||||
|
CREATE TABLE IF NOT EXISTS `n_train_assoc` ( |
||||||
|
`assoc_id` int(11) NOT NULL AUTO_INCREMENT, |
||||||
|
`main_train_uid` char(8) NOT NULL, |
||||||
|
`assoc_train_uid` char(8) NOT NULL, |
||||||
|
`start_date` date NOT NULL, |
||||||
|
`end_date` date NOT NULL, |
||||||
|
`days` char(7) NOT NULL DEFAULT '111111', |
||||||
|
`assoc_cat` char(2) NOT NULL DEFAULT '', |
||||||
|
`date_ind` char(1) NOT NULL DEFAULT '', |
||||||
|
`tiploc` char(8) NOT NULL DEFAULT '', |
||||||
|
`base_suffix` char(7) DEFAULT '', |
||||||
|
`assoc_suffix` char(7) DEFAULT '', |
||||||
|
`diagram_type` char(1) NOT NULL DEFAULT '', |
||||||
|
`stp_ind` char(1) NOT NULL DEFAULT '', |
||||||
|
PRIMARY KEY (`assoc_id`), |
||||||
|
KEY `main_train_uid` (`main_train_uid`), |
||||||
|
KEY `assoc_train_uid` (`assoc_train_uid`) |
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=latin1; |
||||||
|
DROP TABLE IF EXISTS `n_train_error`; |
||||||
|
CREATE TABLE IF NOT EXISTS `n_train_error` ( |
||||||
|
`train_uid` char(8) NOT NULL, |
||||||
|
`segment_data` text NOT NULL, |
||||||
|
PRIMARY KEY (`train_uid`) |
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=latin1; |
||||||
|
DROP TABLE IF EXISTS `n_train_schedule`; |
||||||
|
CREATE TABLE IF NOT EXISTS `n_train_schedule` ( |
||||||
|
`uid` char(12) NOT NULL, |
||||||
|
`train_uid` char(6) NOT NULL, |
||||||
|
`start_date` date NOT NULL, |
||||||
|
`end_date` date NOT NULL, |
||||||
|
`days` char(7) NOT NULL DEFAULT '1111111', |
||||||
|
`status` char(10) NOT NULL, |
||||||
|
`atoc` char(2) NOT NULL, |
||||||
|
PRIMARY KEY (`uid`) USING BTREE, |
||||||
|
KEY `train_uid` (`train_uid`) |
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=latin1; |
||||||
|
DROP TABLE IF EXISTS `n_train_schedule_detail`; |
||||||
|
CREATE TABLE IF NOT EXISTS `n_train_schedule_detail` ( |
||||||
|
`uid` char(12) NOT NULL, |
||||||
|
`category` char(2) DEFAULT NULL, |
||||||
|
`signal_id` char(4) DEFAULT NULL, |
||||||
|
`headcode` char(4) DEFAULT NULL, |
||||||
|
`course_ind` char(1) DEFAULT NULL, |
||||||
|
`service_code` char(8) DEFAULT NULL, |
||||||
|
`bus_sector` char(2) DEFAULT NULL, |
||||||
|
`power_type` char(3) DEFAULT NULL, |
||||||
|
`timing_load` char(4) DEFAULT NULL, |
||||||
|
`speed` mediumint(8) unsigned DEFAULT NULL, |
||||||
|
`op_chars` char(6) DEFAULT NULL, |
||||||
|
`sleeper` char(1) DEFAULT NULL, |
||||||
|
`resv` char(1) DEFAULT NULL, |
||||||
|
`conn_ind` char(1) DEFAULT NULL, |
||||||
|
`catering` char(4) DEFAULT NULL, |
||||||
|
`branding` char(4) DEFAULT NULL, |
||||||
|
PRIMARY KEY (`uid`) |
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=latin1; |
||||||
|
DROP TABLE IF EXISTS `n_train_schedule_location`; |
||||||
|
CREATE TABLE IF NOT EXISTS `n_train_schedule_location` ( |
||||||
|
`tuid` char(12) NOT NULL, |
||||||
|
`seq` int(10) unsigned NOT NULL, |
||||||
|
`tiploc` char(8) DEFAULT NULL, |
||||||
|
`arrival` time DEFAULT NULL, |
||||||
|
`pub_arrival` time DEFAULT NULL, |
||||||
|
`departure` time DEFAULT NULL, |
||||||
|
`pub_departure` time DEFAULT NULL, |
||||||
|
`pass` time DEFAULT NULL, |
||||||
|
`platform` char(3) DEFAULT NULL, |
||||||
|
`line` char(3) DEFAULT NULL, |
||||||
|
`path` char(3) DEFAULT NULL, |
||||||
|
`eng_allowance` time DEFAULT NULL, |
||||||
|
`path_allowance` time DEFAULT NULL, |
||||||
|
`perf_allowance` time DEFAULT NULL, |
||||||
|
PRIMARY KEY (`tuid`,`seq`) |
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=latin1; |
||||||
Loading…
Reference in new issue