Browse Source

New method to handle the data

develop
LO Kam Tao Leo 4 years ago
parent
commit
c55082f738
  1. 83
      src/org/leolo/rail/nrd/AssoicationProcessor.java
  2. 4
      src/org/leolo/rail/nrd/Constants.java
  3. 91
      src/org/leolo/rail/nrd/FileLoader.java
  4. 192
      src/org/leolo/rail/nrd/ScheduleProcessor.java
  5. 82
      src/org/leolo/rail/nrd/TiplocProcessor.java

83
src/org/leolo/rail/nrd/AssoicationProcessor.java

@ -0,0 +1,83 @@
package org.leolo.rail.nrd;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
public class AssoicationProcessor implements Runnable {
private File fileName;
private Logger log = LogManager.getLogger(getClass());
AssoicationProcessor(File fileName){
this.fileName = fileName;
}
@Override
public void run() {
log.info("Processing {}", fileName.getName());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
try(
BufferedReader br = new BufferedReader(new FileReader(fileName));
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)")
){
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
JSONObject obj = new JSONObject(line);
pstmt.setString(1, obj.optString("main_train_uid"));
pstmt.setString(2, obj.optString("assoc_train_uid"));
try {
pstmt.setDate(3, new java.sql.Date(sdf.parse(obj.optString("assoc_start_date")).getTime()));
pstmt.setDate(4, new java.sql.Date(sdf.parse(obj.optString("assoc_end_date")).getTime()));
} catch (ParseException e1) {
log.warn("Unable to parse date! {}", e1.getMessage(), e1);
continue;
}
pstmt.setString(5, obj.optString("assoc_days"));
pstmt.setString(6, obj.optString("category"));
pstmt.setString(7, obj.optString("date_indicator"));
pstmt.setString(8, obj.optString("location"));
if(obj.optString("base_location_suffix")!=null)
pstmt.setString(9, obj.optString("base_location_suffix"));
else
pstmt.setNull(9, Types.VARCHAR);
if(obj.optString("assoc_location_suffix")!=null)
pstmt.setString(10, obj.optString("assoc_location_suffix"));
else
pstmt.setNull(10, Types.VARCHAR);
pstmt.setString(11, obj.optString("diagram_type"));
pstmt.setString(12, obj.optString("CIF_stp_indicator"));
pstmt.addBatch();
}
pstmt.executeBatch();
conn.commit();
log.info("Batch committed.");
} catch (FileNotFoundException e) {
log.error(e.getMessage(), e);
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
if(!fileName.delete()) {
log.warn("Unable to delete {}", fileName.getName());
}
}
}

4
src/org/leolo/rail/nrd/Constants.java

@ -1,6 +1,8 @@
package org.leolo.rail.nrd;
public class Constants {
public static final int BATCH_SIZE = 10000;
public static final int BATCH_SIZE = 1000;
@Deprecated
public static final int MAX_QUEUE_SIZE = 40000;
}

91
src/org/leolo/rail/nrd/FileLoader.java

@ -1,26 +1,17 @@
package org.leolo.rail.nrd;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.URL;
import java.net.URLConnection;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Base64;
import java.util.Hashtable;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipException;
@ -69,40 +60,52 @@ public class FileLoader {
System.exit(4);
}
log.info("Connected!");
File tempDir = new File(ConfigurationManager.getInstance().getProperty("file.temp_dir", ".")+"/nrdg");
if(!tempDir.exists()) {
tempDir.mkdirs();
}
DatabaseManager.getInstance().clear();
int countA=0, countT=0, countS=0;
int batchA=0, batchT=0, batchS=0;
try(
GZIPInputStream gis = new GZIPInputStream(fis);
BufferedReader br = new BufferedReader(new InputStreamReader(gis))
BufferedReader br = new BufferedReader(new InputStreamReader(gis));
){
PrintWriter ass = new PrintWriter(new File(tempDir, "ass_0"));
PrintWriter tip = new PrintWriter(new File(tempDir, "tip_0"));
PrintWriter sch = new PrintWriter(new File(tempDir, "sch_0"));
int count = 0;
AssoicationHandler asso = new AssoicationHandler();
TiplocHandler tiploc = new TiplocHandler();
ScheduleHandler schedule = new ScheduleHandler();
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
count++;
if(asso.getQueueSize()+tiploc.getQueueSize()+schedule.getQueueSize() > Constants.MAX_QUEUE_SIZE) {
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
JSONObject obj = new JSONObject(line);
String objectType = obj.keys().next();
if("JsonTimetableV1".equals(objectType)) {
}else if("JsonAssociationV1".equals(objectType)){
asso.add(obj.getJSONObject("JsonAssociationV1"));
ass.println(obj.getJSONObject("JsonAssociationV1"));
countA++;
if(countA%Constants.BATCH_SIZE==0) {
ass.close();
ass = new PrintWriter(new File(tempDir, "ass_"+(++batchA)));
}
}else if("TiplocV1".equals(objectType)){
tiploc.add(obj.getJSONObject("TiplocV1"));
tip.println(obj.getJSONObject("TiplocV1"));
countT++;
if(countT%Constants.BATCH_SIZE==0) {
tip.close();
tip = new PrintWriter(new File(tempDir, "tip_"+(++batchT)));
}
}else if("JsonScheduleV1".equals(objectType)){
schedule.add(obj.getJSONObject("JsonScheduleV1"));
sch.println(obj.getJSONObject("JsonScheduleV1"));
countS++;
if(countS%Constants.BATCH_SIZE==0) {
sch.close();
sch = new PrintWriter(new File(tempDir, "sch_"+(++batchS)));
}
}else if("EOF".equals(objectType)){
//Nothing to do
}else {
@ -111,16 +114,15 @@ public class FileLoader {
}
}
ass.close();
tip.close();
sch.close();
log.info("Total count : {}", count);
asso.shutdownAndWait();
tiploc.shutdownAndWait();
schedule.shutdownAndWait();
}catch(ZipException e) {
log.error(e.getMessage(), e);
captureFile();
}catch(IOException e) {
log.error(e.getMessage(), e);
// log.error(e.)
System.exit(1);
}
try {
@ -129,7 +131,34 @@ public class FileLoader {
log.fatal(e.getMessage(), e);
System.exit(1);
}
log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS);
ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(Integer.parseInt(ConfigurationManager.getInstance().getProperty("thread", "20")));
for(int i=0;i<=batchA;i++) {
log.info("Queued Assoication - {}", i);
threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i)));
}
for(int i=0;i<=batchT;i++) {
log.info("Queued Tiploc - {}", i);
threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i)));
}
for(int i=0;i<=batchS;i++) {
log.info("Queued Schedule - {}", i);
threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i)));
}
threadPool.shutdown();
try {
threadPool.awaitTermination(3, TimeUnit.HOURS);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
if(!threadPool.isTerminated()) {
log.error("Some job cannot be finished!");
System.exit(50);
}
DatabaseManager.getInstance().shutdown();
if(!tempDir.delete()) {
log.warn("Unable to remove temp dir!");
}
log.info("Job finished!");
}

192
src/org/leolo/rail/nrd/ScheduleProcessor.java

@ -0,0 +1,192 @@
package org.leolo.rail.nrd;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
public class ScheduleProcessor implements Runnable {
private File fileName;
private Logger log = LogManager.getLogger(getClass());
ScheduleProcessor(File fileName){
this.fileName = fileName;
}
private static int trainCount = 0;
private static synchronized String getTUID() {
String id = Integer.toString(++trainCount, 36);
StringBuffer sb = new StringBuffer();
for(int i = id.length();i<8;i++) {
sb.append("0");
}
sb.append(id);
return sb.toString();
}
private Time parseTime(String time) {
try {
int hour = Integer.parseInt(time.substring(0, 2));
int min = Integer.parseInt(time.substring(2, 4));
boolean halfMin = time.length()>4 && 'H' == time.charAt(4);
return new Time(hour*3_600_000+min*60_000+(halfMin?30_000:0));
}catch(RuntimeException e) {
log.error("For time \"{}\":{}", time, e.getMessage(), e);
}
return null;
}
private String parseSTime(String time) {
if("H".equals(time))
return "00:00:30";
int min = Integer.parseInt(time.substring(0,1));
boolean half = time.length()>1 && 'H' == time.charAt(1);
return "00:0"+min+(half?":30":":00");
}
private void setTime(PreparedStatement stmt, int pos, String time) throws SQLException{
if(time==null||"".equals(time)) {
stmt.setNull(pos, Types.TIME);
}else {
stmt.setTime(pos, parseTime(time));
}
}
private void setSTime(PreparedStatement stmt, int pos, String time) throws SQLException{
if(time==null||"".equals(time)) {
stmt.setNull(pos, Types.TIME);
}else {
stmt.setString(pos, parseSTime(time));
}
}
@Override
public void run() {
log.info("Processing {}", fileName.getName());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
try(
BufferedReader br = new BufferedReader(new FileReader(fileName));
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement sMain = conn.prepareStatement("INSERT INTO train_schedule VALUES (?,?,?,?,?,?,?)");
PreparedStatement sDetail = conn.prepareStatement("INSERT INTO train_schedule_detail VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
PreparedStatement sError = conn.prepareStatement("INSERT INTO train_error VALUES (?,?)");
PreparedStatement sLoca = conn.prepareStatement("INSERT INTO train_schedule_location VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
){
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
String trainUID = getTUID();
JSONObject obj = new JSONObject(line);sMain.setString(1, trainUID);
sMain.setString(2, obj.optString("CIF_train_uid"));
try {
sMain.setDate(3, new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime()));
sMain.setDate(4, new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime()));
} catch (ParseException e1) {
log.error(e1.getMessage(), e1);
continue;
}
sMain.setString(5, obj.optString("schedule_days_runs"));
sMain.setString(6, obj.optString("train_status"));
sMain.setString(7, obj.optString("atoc_code"));
sMain.addBatch();
JSONObject detail = obj.optJSONObject("schedule_segment");
setString(sDetail, 1, trainUID);
setString(sDetail, 2, detail.optString("CIF_train_category"));
setString(sDetail, 3, detail.optString("signalling_id"));
setString(sDetail, 4, detail.optString("CIF_headcode"));
setString(sDetail, 5, detail.optString("CIF_course_indicator"));
setString(sDetail, 6, detail.optString("CIF_train_service_code"));
setString(sDetail, 7, detail.optString("CIF_business_sector"));
setString(sDetail, 8, detail.optString("CIF_power_type"));
setString(sDetail, 9, detail.optString("CIF_timing_load"));
setInt (sDetail,10, detail.optString("CIF_speed"));
setString(sDetail,11, detail.optString("CIF_operating_characteristics"));
setString(sDetail,12, detail.optString("CIF_sleepers"));
setString(sDetail,13, detail.optString("CIF_reservations"));
setString(sDetail,14, detail.optString("CIF_connection_indicator"));
setString(sDetail,15, detail.optString("CIF_catering_code"));
setString(sDetail,16, detail.optString("CIF_service_branding"));
sDetail.addBatch();
JSONArray locations = detail.optJSONArray("schedule_location");
if(locations!=null) {
for(int i=0;i<locations.length();i++) {
JSONObject location = locations.getJSONObject(i);
String type = location.getString("location_type");
if("LO".equals(type) || "LI".equals(type) || "LT".equals(type)) {
setString(sLoca, 1, trainUID);
sLoca.setInt(2, i);
setString(sLoca, 3, location.optString("tiploc_code"));
setTime (sLoca, 4, location.optString("arrival"));
setTime (sLoca, 5, location.optString("public_arrival"));
setTime (sLoca, 6, location.optString("departure"));
setTime (sLoca, 7, location.optString("public_departure"));
setTime (sLoca, 8, location.optString("pass"));
setString(sLoca, 9, location.optString("platform"));
setString(sLoca,10, location.optString("line"));
setString(sLoca,11, location.optString("path"));
setSTime (sLoca,12, location.optString("engineering_allowance"));
setSTime (sLoca,13, location.optString("pathing_allowance"));
setSTime (sLoca,14, location.optString("performance_allowance"));
sLoca.addBatch();
}else {
log.fatal("Unknown entry type {}", type);
System.exit(3);
}
}
}else {
sError.setString(1, trainUID);
sError.setString(2, obj.toString());
// sError.addBatch();
// currentSize++;
}
}
sMain.executeBatch();
sDetail.executeBatch();
sError.executeBatch();
sLoca.executeBatch();
conn.commit();
log.info("Batch committed.");
} catch (FileNotFoundException e) {
log.error(e.getMessage(), e);
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
if(!fileName.delete()) {
log.warn("Unable to delete {}", fileName.getName());
}
}
private void setInt(PreparedStatement stmt, int i, String string) throws SQLException{
if(string==null || "".equals(string))
stmt.setNull(i, Types.INTEGER);
else
stmt.setInt(i, Integer.parseInt(string));
}
private void setString(PreparedStatement stmt, int i, String string) throws SQLException{
if(string==null)
stmt.setNull(i, Types.VARCHAR);
else
stmt.setString(i, string);
}
}

82
src/org/leolo/rail/nrd/TiplocProcessor.java

@ -0,0 +1,82 @@
package org.leolo.rail.nrd;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
public class TiplocProcessor implements Runnable {
private File fileName;
private Logger log = LogManager.getLogger(getClass());
TiplocProcessor(File fileName){
this.fileName = fileName;
}
@Override
public void run() {
log.info("Processing {}", fileName.getName());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
try(
BufferedReader br = new BufferedReader(new FileReader(fileName));
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO tiploc VALUES (?,?,?,?,?,?)")
){
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
JSONObject obj = new JSONObject(line);
pstmt.setString(1, obj.optString("tiploc_code"));
if(obj.has("nalco"))
pstmt.setString(2, obj.optString("nalco"));
else
pstmt.setNull(2, Types.VARCHAR);
if(obj.has("stanox"))
pstmt.setString(3, obj.optString("stanox"));
else
pstmt.setNull(3, Types.VARCHAR);
if(obj.has("crs_code"))
pstmt.setString(4, obj.optString("crs_code"));
else
pstmt.setNull(4, Types.VARCHAR);
if(obj.has("description"))
pstmt.setString(5, obj.optString("description"));
else
pstmt.setNull(5, Types.VARCHAR);
if(obj.has("tps_description"))
pstmt.setString(6, obj.optString("tps_description"));
else
pstmt.setNull(6, Types.VARCHAR);
pstmt.addBatch();
}
pstmt.executeBatch();
conn.commit();
log.info("Batch committed.");
} catch (FileNotFoundException e) {
log.error(e.getMessage(), e);
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
if(!fileName.delete()) {
log.warn("Unable to delete {}", fileName.getName());
}
}
}
Loading…
Cancel
Save