Browse Source

Changes TUID generation method and reduce the amount of logs

develop
LO Kam Tao Leo 4 years ago
parent
commit
f49d900fdc
  1. 2
      src/org/leolo/rail/nrd/AssoicationProcessor.java
  2. 14
      src/org/leolo/rail/nrd/FileLoader.java
  3. 65
      src/org/leolo/rail/nrd/ScheduleProcessor.java
  4. 2
      src/org/leolo/rail/nrd/TiplocProcessor.java

2
src/org/leolo/rail/nrd/AssoicationProcessor.java

@ -70,7 +70,7 @@ public class AssoicationProcessor implements Runnable {
} }
pstmt.executeBatch(); pstmt.executeBatch();
conn.commit(); conn.commit();
log.info("Batch committed."); log.info("Batch {} committed.", fileName.getName());
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} catch (IOException e) { } catch (IOException e) {

14
src/org/leolo/rail/nrd/FileLoader.java

@ -94,6 +94,8 @@ public class FileLoader {
countA++; countA++;
if(countA%Constants.BATCH_SIZE==0) { if(countA%Constants.BATCH_SIZE==0) {
ass.close(); ass.close();
if(batchA%50==0)
log.info("Created batch for assoication");
ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_"+(++batchA))))); ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_"+(++batchA)))));
} }
}else if("TiplocV1".equals(objectType)){ }else if("TiplocV1".equals(objectType)){
@ -101,6 +103,8 @@ public class FileLoader {
countT++; countT++;
if(countT%Constants.BATCH_SIZE==0) { if(countT%Constants.BATCH_SIZE==0) {
tip.close(); tip.close();
if(batchT%50==0)
log.info("Created batch for TIPLOC");
tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_"+(++batchT))))); tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_"+(++batchT)))));
} }
}else if("JsonScheduleV1".equals(objectType)){ }else if("JsonScheduleV1".equals(objectType)){
@ -108,6 +112,8 @@ public class FileLoader {
countS++; countS++;
if(countS%Constants.BATCH_SIZE==0) { if(countS%Constants.BATCH_SIZE==0) {
sch.close(); sch.close();
if(batchS%50==0)
log.info("Created batch for schedule");
sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_"+(++batchS))))); sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_"+(++batchS)))));
} }
}else if("EOF".equals(objectType)){ }else if("EOF".equals(objectType)){
@ -121,6 +127,10 @@ public class FileLoader {
ass.close(); ass.close();
tip.close(); tip.close();
sch.close(); sch.close();
log.info("Created batch for assoication");
log.info("Created batch for TIPLOC");
log.info("Created batch for schedule");
log.info("Total count : {}", count); log.info("Total count : {}", count);
}catch(ZipException e) { }catch(ZipException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
@ -138,17 +148,21 @@ public class FileLoader {
log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS); log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS);
ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(Integer.parseInt(ConfigurationManager.getInstance().getProperty("thread", "20"))); ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(Integer.parseInt(ConfigurationManager.getInstance().getProperty("thread", "20")));
for(int i=0;i<=batchA;i++) { for(int i=0;i<=batchA;i++) {
if(i%50==0)
log.info("Queued Assoication - {}", i); log.info("Queued Assoication - {}", i);
threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i))); threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i)));
} }
for(int i=0;i<=batchT;i++) { for(int i=0;i<=batchT;i++) {
if(i%50==0)
log.info("Queued Tiploc - {}", i); log.info("Queued Tiploc - {}", i);
threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i))); threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i)));
} }
for(int i=0;i<=batchS;i++) { for(int i=0;i<=batchS;i++) {
if(i%50==0)
log.info("Queued Schedule - {}", i); log.info("Queued Schedule - {}", i);
threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i))); threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i)));
} }
log.info("All entry queued.");
threadPool.shutdown(); threadPool.shutdown();
try { try {
threadPool.awaitTermination(3, TimeUnit.HOURS); threadPool.awaitTermination(3, TimeUnit.HOURS);

65
src/org/leolo/rail/nrd/ScheduleProcessor.java

@ -12,8 +12,14 @@ import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Time; import java.sql.Time;
import java.sql.Types; import java.sql.Types;
import java.text.DateFormat;
import java.text.FieldPosition;
import java.text.ParseException; import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import java.util.Random;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -25,6 +31,30 @@ public class ScheduleProcessor implements Runnable {
private File fileName; private File fileName;
private Logger log = LogManager.getLogger(getClass()); private Logger log = LogManager.getLogger(getClass());
private static TUIDDateFormat tdf = new TUIDDateFormat();
public static final String SEQ_ID = "0123456789ABCDEFGHJKLMNPRSTUVWXY";
private static Random r = new Random();
private static class TUIDDateFormat extends DateFormat{
public static final String MONTH_ID = "MBTQPHSONDUE";
public static final String DAY_ID = "0123456789ABCDEFGHJKLMNPRSTUVWX";
@Override
public StringBuffer format(Date arg0, StringBuffer arg1, FieldPosition arg2) {
// TODO Auto-generated method stub
arg1.append(MONTH_ID.charAt(arg0.getMonth()));
arg1.append(DAY_ID.charAt(arg0.getDate()-1));
return arg1;
}
@Override
public Date parse(String arg0, ParsePosition arg1) {
throw new UnsupportedOperationException();
}
}
ScheduleProcessor(File fileName){ ScheduleProcessor(File fileName){
this.fileName = fileName; this.fileName = fileName;
@ -41,12 +71,27 @@ public class ScheduleProcessor implements Runnable {
return sb.toString(); return sb.toString();
} }
private Time parseTime(String time) { private static Hashtable<String, Integer> uidMap = new Hashtable<>();
private static synchronized String getTUID(String uid, Date startDate, Date endDate) {
String buid = uid+tdf.format(startDate)+tdf.format(endDate);
int count = 0;
if(uidMap.containsKey(buid)) {
count = uidMap.get(buid);
count +=1;
uidMap.put(buid, count);
}else {
uidMap.put(buid, 0);
}
return buid+SEQ_ID.charAt(count);
}
private String parseTime(String time) {
try { try {
int hour = Integer.parseInt(time.substring(0, 2)); int hour = Integer.parseInt(time.substring(0, 2));
int min = Integer.parseInt(time.substring(2, 4)); int min = Integer.parseInt(time.substring(2, 4));
boolean halfMin = time.length()>4 && 'H' == time.charAt(4); boolean halfMin = time.length()>4 && 'H' == time.charAt(4);
return new Time(hour*3_600_000+min*60_000+(halfMin?30_000:0)); return hour+":"+min+(halfMin?":30":":00");
}catch(RuntimeException e) { }catch(RuntimeException e) {
log.error("For time \"{}\":{}", time, e.getMessage(), e); log.error("For time \"{}\":{}", time, e.getMessage(), e);
} }
@ -65,7 +110,7 @@ public class ScheduleProcessor implements Runnable {
if(time==null||"".equals(time)) { if(time==null||"".equals(time)) {
stmt.setNull(pos, Types.TIME); stmt.setNull(pos, Types.TIME);
}else { }else {
stmt.setTime(pos, parseTime(time)); stmt.setString(pos, parseTime(time));
} }
} }
@ -94,16 +139,20 @@ public class ScheduleProcessor implements Runnable {
if(line==null) { if(line==null) {
break; break;
} }
String trainUID = getTUID(); String trainUID;
JSONObject obj = new JSONObject(line);sMain.setString(1, trainUID); JSONObject obj = new JSONObject(line);
sMain.setString(2, obj.optString("CIF_train_uid")); sMain.setString(2, obj.optString("CIF_train_uid"));
try { try {
sMain.setDate(3, new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime())); java.sql.Date startDate = new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime());
sMain.setDate(4, new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime())); java.sql.Date endDate = new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime());
trainUID = getTUID(obj.optString("CIF_train_uid"), startDate, endDate);
sMain.setDate(3, startDate);
sMain.setDate(4, endDate);
} catch (ParseException e1) { } catch (ParseException e1) {
log.error(e1.getMessage(), e1); log.error(e1.getMessage(), e1);
continue; continue;
} }
sMain.setString(1, trainUID);
sMain.setString(5, obj.optString("schedule_days_runs")); sMain.setString(5, obj.optString("schedule_days_runs"));
sMain.setString(6, obj.optString("train_status")); sMain.setString(6, obj.optString("train_status"));
sMain.setString(7, obj.optString("atoc_code")); sMain.setString(7, obj.optString("atoc_code"));
@ -165,7 +214,7 @@ public class ScheduleProcessor implements Runnable {
sError.executeBatch(); sError.executeBatch();
sLoca.executeBatch(); sLoca.executeBatch();
conn.commit(); conn.commit();
log.info("Batch committed."); log.info("Batch {} committed.", fileName.getName());
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} catch (IOException e) { } catch (IOException e) {

2
src/org/leolo/rail/nrd/TiplocProcessor.java

@ -69,7 +69,7 @@ public class TiplocProcessor implements Runnable {
} }
pstmt.executeBatch(); pstmt.executeBatch();
conn.commit(); conn.commit();
log.info("Batch committed."); log.info("Batch {} committed.", fileName.getName());
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} catch (IOException e) { } catch (IOException e) {

Loading…
Cancel
Save