Browse Source

Handle a predownload file

develop
LO Kam Tao Leo 4 years ago
parent
commit
35c7e72c49
  1. 11
      src/org/leolo/rail/nrd/AssoicationHandler.java
  2. 4
      src/org/leolo/rail/nrd/Constants.java
  3. 4
      src/org/leolo/rail/nrd/DatabaseManager.java
  4. 2
      src/org/leolo/rail/nrd/FileLoader.java
  5. 160
      src/org/leolo/rail/nrd/ScheduleHandler.java
  6. 23
      src/org/leolo/rail/nrd/TiplocHandler.java

11
src/org/leolo/rail/nrd/AssoicationHandler.java

@ -27,7 +27,11 @@ public class AssoicationHandler implements Runnable{
} }
public void add(JSONObject obj) { public void add(JSONObject obj) {
if(obj!=null) {
queue.add(obj); queue.add(obj);
}else {
log.warn("Trying to add a null object!");
}
synchronized(SYNC_TOKEN) { synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll(); SYNC_TOKEN.notifyAll();
} }
@ -43,11 +47,12 @@ public class AssoicationHandler implements Runnable{
public void shutdownAndWait() { public void shutdownAndWait() {
shutdown(); shutdown();
while(true) { while(true) {
log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null);
if(terminated) { if(terminated) {
break; break;
} }
synchronized(SYNC_TOKEN) { synchronized(SYNC_TOKEN) {
log.debug("Waiting for termination.");
try { try {
SYNC_TOKEN.notifyAll(); SYNC_TOKEN.notifyAll();
SYNC_TOKEN.wait(1000); SYNC_TOKEN.wait(1000);
@ -58,14 +63,14 @@ public class AssoicationHandler implements Runnable{
} }
} }
private int currentSize = 0;
private int commitCount = 0;
@Override @Override
public void run() { public void run() {
try( try(
Connection conn = DatabaseManager.getInstance().getConnection(); Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)") PreparedStatement pstmt = conn.prepareStatement("INSERT INTO train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)")
){ ){
int currentSize = 0;
int commitCount = 0;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
while(true) { while(true) {
JSONObject obj = queue.poll(); JSONObject obj = queue.poll();

4
src/org/leolo/rail/nrd/Constants.java

@ -1,6 +1,6 @@
package org.leolo.rail.nrd; package org.leolo.rail.nrd;
public class Constants { public class Constants {
public static final int BATCH_SIZE = 200; public static final int BATCH_SIZE = 500;
public static final int MAX_QUEUE_SIZE = 2000; public static final int MAX_QUEUE_SIZE = 20000;
} }

4
src/org/leolo/rail/nrd/DatabaseManager.java

@ -87,7 +87,9 @@ public class DatabaseManager {
stmt.execute("TRUNCATE TABLE train_assoc"); stmt.execute("TRUNCATE TABLE train_assoc");
stmt.execute("TRUNCATE TABLE tiploc"); stmt.execute("TRUNCATE TABLE tiploc");
stmt.execute("TRUNCATE TABLE train_schedule"); stmt.execute("TRUNCATE TABLE train_schedule");
stmt.execute("TRUNCATE TABLE train_segment"); stmt.execute("TRUNCATE TABLE train_schedule_detail");
stmt.execute("TRUNCATE TABLE train_schedule_location");
stmt.execute("TRUNCATE TABLE train_error");
}catch(SQLException e) { }catch(SQLException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }

2
src/org/leolo/rail/nrd/FileLoader.java

@ -101,6 +101,8 @@ public class FileLoader {
} }
log.info("Total count : {}", count); log.info("Total count : {}", count);
asso.shutdownAndWait(); asso.shutdownAndWait();
tiploc.shutdownAndWait();
schedule.shutdownAndWait();
}catch(IOException e) { }catch(IOException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
System.exit(1); System.exit(1);

160
src/org/leolo/rail/nrd/ScheduleHandler.java

@ -4,6 +4,8 @@ import java.sql.Connection;
import java.sql.Date; import java.sql.Date;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Time;
import java.sql.Types;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Hashtable; import java.util.Hashtable;
@ -14,6 +16,7 @@ import java.util.Random;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject; import org.json.JSONObject;
public class ScheduleHandler implements Runnable{ public class ScheduleHandler implements Runnable{
@ -30,7 +33,11 @@ public class ScheduleHandler implements Runnable{
} }
public void add(JSONObject obj) { public void add(JSONObject obj) {
if(obj!=null) {
queue.add(obj); queue.add(obj);
}else {
log.warn("Trying to add a null object!");
}
synchronized(SYNC_TOKEN) { synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll(); SYNC_TOKEN.notifyAll();
} }
@ -50,7 +57,7 @@ public class ScheduleHandler implements Runnable{
break; break;
} }
synchronized(SYNC_TOKEN) { synchronized(SYNC_TOKEN) {
log.debug("Waiting for termination."); log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null);
try { try {
SYNC_TOKEN.notifyAll(); SYNC_TOKEN.notifyAll();
SYNC_TOKEN.wait(1000); SYNC_TOKEN.wait(1000);
@ -61,33 +68,93 @@ public class ScheduleHandler implements Runnable{
} }
} }
private int currentSize = 0;
private int commitCount = 0;
private int trainCount = 0;
private synchronized String getTUID() {
String id = Integer.toString(++trainCount, 36);
StringBuffer sb = new StringBuffer();
for(int i = id.length();i<8;i++) {
sb.append("0");
}
sb.append(id);
return sb.toString();
}
private Time parseTime(String time) {
try {
int hour = Integer.parseInt(time.substring(0, 2));
int min = Integer.parseInt(time.substring(2, 4));
boolean halfMin = time.length()>4 && 'H' == time.charAt(4);
return new Time(hour*3_600_000+min*60_000+(halfMin?30_000:0));
}catch(RuntimeException e) {
log.error("For time \"{}\":{}", time, e.getMessage(), e);
}
return null;
}
private String parseSTime(String time) {
if("H".equals(time))
return "00:00:30";
int min = Integer.parseInt(time.substring(0,1));
boolean half = time.length()>1 && 'H' == time.charAt(1);
return "00:0"+min+(half?":30":":00");
}
private void setTime(PreparedStatement stmt, int pos, String time) throws SQLException{
if(time==null||"".equals(time)) {
stmt.setNull(pos, Types.TIME);
}else {
stmt.setTime(pos, parseTime(time));
}
}
private void setSTime(PreparedStatement stmt, int pos, String time) throws SQLException{
if(time==null||"".equals(time)) {
stmt.setNull(pos, Types.TIME);
}else {
stmt.setString(pos, parseSTime(time));
}
}
@Override @Override
public void run() { public void run() {
try( try(
Connection conn = DatabaseManager.getInstance().getConnection(); Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement sMain = conn.prepareStatement("INSERT INTO train_schedule VALUES (?,?,?,?,?,?)"); PreparedStatement sMain = conn.prepareStatement("INSERT INTO train_schedule VALUES (?,?,?,?,?,?,?)");
PreparedStatement sSeg = conn.prepareStatement("INSERT INTO train_segment VALUES (?,?)") PreparedStatement sDetail = conn.prepareStatement("INSERT INTO train_schedule_detail VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
PreparedStatement sError = conn.prepareStatement("INSERT INTO train_error VALUES (?,?)");
PreparedStatement sLoca = conn.prepareStatement("INSERT INTO train_schedule_location VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
){ ){
int currentSize = 0;
int commitCount = 0;
int trainCount = 0;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
while(true) { while(true) {
JSONObject obj = queue.poll(); JSONObject obj = queue.poll();
if(shutdown && (queue.isEmpty()||obj==null)) { if(shutdown && (queue.isEmpty()||obj==null)) {
sMain.executeBatch(); sMain.executeBatch();
sSeg.executeBatch(); sDetail.executeBatch();
sError.executeBatch();
sLoca.executeBatch();
conn.commit(); conn.commit();
log.info("Train schedule committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); log.info("Train schedule committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount);
break; break;
}else if(obj==null) { }else if(obj==null) {
log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
log.debug("Woke up.");
continue; continue;
} }
synchronized(FileLoader.SYNC_TOKEN) { synchronized(FileLoader.SYNC_TOKEN) {
FileLoader.SYNC_TOKEN.notifyAll(); FileLoader.SYNC_TOKEN.notifyAll();
} }
//Parse data //Parse data
String trainUID = Integer.toHexString(++trainCount); String trainUID = getTUID();
sMain.setString(1, trainUID); sMain.setString(1, trainUID);
sMain.setString(2, obj.optString("CIF_train_uid")); sMain.setString(2, obj.optString("CIF_train_uid"));
try { try {
@ -99,21 +166,73 @@ public class ScheduleHandler implements Runnable{
} }
sMain.setString(5, obj.optString("schedule_days_runs")); sMain.setString(5, obj.optString("schedule_days_runs"));
sMain.setString(6, obj.optString("train_status")); sMain.setString(6, obj.optString("train_status"));
sMain.setString(7, obj.optString("atoc_code"));
sMain.addBatch(); sMain.addBatch();
currentSize++; currentSize++;
// sSeg.setString(1, trainUID); JSONObject detail = obj.optJSONObject("schedule_segment");
// sSeg.setString(2, obj.optJSONObject("schedule_segment").toString()); setString(sDetail, 1, trainUID);
// sSeg.addBatch(); setString(sDetail, 2, detail.optString("CIF_train_category"));
setString(sDetail, 3, detail.optString("signalling_id"));
setString(sDetail, 4, detail.optString("CIF_headcode"));
setString(sDetail, 5, detail.optString("CIF_course_indicator"));
setString(sDetail, 6, detail.optString("CIF_train_service_code"));
setString(sDetail, 7, detail.optString("CIF_business_sector"));
setString(sDetail, 8, detail.optString("CIF_power_type"));
setString(sDetail, 9, detail.optString("CIF_timing_load"));
setInt (sDetail,10, detail.optString("CIF_speed"));
setString(sDetail,11, detail.optString("CIF_operating_characteristics"));
setString(sDetail,12, detail.optString("CIF_sleepers"));
setString(sDetail,13, detail.optString("CIF_reservations"));
setString(sDetail,14, detail.optString("CIF_connection_indicator"));
setString(sDetail,15, detail.optString("CIF_catering_code"));
setString(sDetail,16, detail.optString("CIF_service_branding"));
sDetail.addBatch();
currentSize++;
JSONArray locations = detail.optJSONArray("schedule_location");
if(locations!=null) {
for(int i=0;i<locations.length();i++) {
JSONObject location = locations.getJSONObject(i);
String type = location.getString("location_type");
if("LO".equals(type) || "LI".equals(type) || "LT".equals(type)) {
setString(sLoca, 1, trainUID);
sLoca.setInt(2, i);
setString(sLoca, 3, location.optString("tiploc_code"));
setTime (sLoca, 4, location.optString("arrival"));
setTime (sLoca, 5, location.optString("public_arrival"));
setTime (sLoca, 6, location.optString("departure"));
setTime (sLoca, 7, location.optString("public_departure"));
setTime (sLoca, 8, location.optString("pass"));
setString(sLoca, 9, location.optString("platform"));
setString(sLoca,10, location.optString("line"));
setString(sLoca,11, location.optString("path"));
setSTime (sLoca,12, location.optString("engineering_allowance"));
setSTime (sLoca,13, location.optString("pathing_allowance"));
setSTime (sLoca,14, location.optString("performance_allowance"));
sLoca.addBatch();
currentSize++;
}else {
log.fatal("Unknown entry type {}", type);
System.exit(3);
}
}
}else {
sError.setString(1, trainUID);
sError.setString(2, obj.toString());
// sError.addBatch();
// currentSize++; // currentSize++;
}
if(currentSize>=Constants.BATCH_SIZE) { if(currentSize>=Constants.BATCH_SIZE) {
sMain.executeBatch(); sMain.executeBatch();
sSeg.executeBatch(); sDetail.executeBatch();
sError.executeBatch();
sLoca.executeBatch();
conn.commit(); conn.commit();
log.info("Train schedule committed {} entry. Commit #{}", currentSize, ++commitCount); log.info("Train schedule committed {} entry. Commit #{}", currentSize, ++commitCount);
currentSize = 0; currentSize = 0;
} }
if(queue.isEmpty()) { if(queue.isEmpty()) {
log.debug("Empty queue. Wait for more entries"); // log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) { synchronized(SYNC_TOKEN) {
try { try {
SYNC_TOKEN.wait(); SYNC_TOKEN.wait();
@ -121,6 +240,7 @@ public class ScheduleHandler implements Runnable{
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
} }
// log.debug("Woke up.");
} }
} }
}catch(SQLException e) { }catch(SQLException e) {
@ -133,6 +253,20 @@ public class ScheduleHandler implements Runnable{
} }
private void setInt(PreparedStatement stmt, int i, String string) throws SQLException{
if(string==null || "".equals(string))
stmt.setNull(i, Types.INTEGER);
else
stmt.setInt(i, Integer.parseInt(string));
}
private void setString(PreparedStatement stmt, int i, String string) throws SQLException{
if(string==null)
stmt.setNull(i, Types.VARCHAR);
else
stmt.setString(i, string);
}
public int getQueueSize() { public int getQueueSize() {
return queue.size(); return queue.size();
} }

23
src/org/leolo/rail/nrd/TiplocHandler.java

@ -27,7 +27,12 @@ public class TiplocHandler implements Runnable{
} }
public void add(JSONObject obj) { public void add(JSONObject obj) {
if(obj!=null) {
// log.debug("Size {} >>", queue.size());
queue.add(obj); queue.add(obj);
}else {
log.warn("Trying to add a null object!");
}
synchronized(SYNC_TOKEN) { synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll(); SYNC_TOKEN.notifyAll();
} }
@ -43,11 +48,12 @@ public class TiplocHandler implements Runnable{
public void shutdownAndWait() { public void shutdownAndWait() {
shutdown(); shutdown();
while(true) { while(true) {
log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null);
if(terminated) { if(terminated) {
break; break;
} }
synchronized(SYNC_TOKEN) { synchronized(SYNC_TOKEN) {
log.debug("Waiting for termination.");
try { try {
SYNC_TOKEN.notifyAll(); SYNC_TOKEN.notifyAll();
SYNC_TOKEN.wait(1000); SYNC_TOKEN.wait(1000);
@ -57,6 +63,8 @@ public class TiplocHandler implements Runnable{
} }
} }
} }
private int currentSize = 0;
private int commitCount = 0;
@Override @Override
public void run() { public void run() {
@ -64,8 +72,6 @@ public class TiplocHandler implements Runnable{
Connection conn = DatabaseManager.getInstance().getConnection(); Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO tiploc VALUES (?,?,?,?,?,?)") PreparedStatement pstmt = conn.prepareStatement("INSERT INTO tiploc VALUES (?,?,?,?,?,?)")
){ ){
int currentSize = 0;
int commitCount = 0;
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); // SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
while(true) { while(true) {
JSONObject obj = queue.poll(); JSONObject obj = queue.poll();
@ -75,6 +81,15 @@ public class TiplocHandler implements Runnable{
log.info("TIPLOC committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); log.info("TIPLOC committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount);
break; break;
}else if(obj==null) { }else if(obj==null) {
log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) {
try {
SYNC_TOKEN.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
log.debug("Woke up.");
continue; continue;
} }
synchronized(FileLoader.SYNC_TOKEN) { synchronized(FileLoader.SYNC_TOKEN) {
@ -111,7 +126,7 @@ public class TiplocHandler implements Runnable{
currentSize = 0; currentSize = 0;
} }
if(queue.isEmpty()) { if(queue.isEmpty()) {
log.debug("Empty queue. Wait for more entries"); // log.debug("Empty queue. Wait for more entries");
synchronized(SYNC_TOKEN) { synchronized(SYNC_TOKEN) {
try { try {
SYNC_TOKEN.wait(); SYNC_TOKEN.wait();

Loading…
Cancel
Save