<< Click to Display Table of Contents >> Application Example |
❖Push data
Push data from data source to cluster. The usertask program needs to be written to parse the original data, and then import the parsed data in data mart. Please refer to Parse CSV example.
❖Pull data
Pull data from data source by using the job in schedule tasks. As shown in the image, create a job in schedule tasks, choose Incremental Import Data to Data Mart in the type of job, and choose one created data set, then the data set result can be imported in data mart after execution. Please refer to Parse CSV example.
❖Add and delete Cloud File and Cloud Folder
import g5.Setting;
import g5.dc.impl.DCUtil;
import g5.dc.node.GNodeResult;
import g5.dc.node.RemoveFolderTask;
import g5.sv.db.DBService;
public class TestRemoveFolderTask {
public static void main(String[] args) throws Exception {
String bihome ="E:/source/yonghongsoft/source/home";
String cloudName = "testCloud";
System.setProperty("bi.home", bihome);
String license = Setting.get("license");
if(license == null || !g5.util.KeyUtil.init(license)) {
System.out.println("_________key invalid____");
return;
}
Setting.setBoolean("dc.naming.check.file", false);
DCUtil.init(true);
DBService.get();
System.err.println("Press any key to generate...");
System.in.read();
System.in.read();
RemoveFolderTask task = new RemoveFolderTask(cloudName);
GNodeResult result = task.exec();
if(result != null) {
System.out.println("____result:_________"+result.state()+"|"+result.toString());
}
}
}
❖Parse CSV
ParserCsv.java
import g5.dc.fs.s.GSFolder;
import g5.dc.node.impl.GSFolderC;
import g5.grid.impl.GridUtil;
import g5.io.IOUtil;
import g5.meta.BCol;
import g5.meta.DType;
import g5.meta.QCol;
import g5.qry.impl.QColumnSeg;
import g5.qry.impl.QGrid;
import g5.qry.impl.QUtil;
import g5.sched.TaskContext;
import java.io.*;
import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang.StringUtils;
import com.ibm.icu.text.DateFormat;
import com.ibm.icu.text.SimpleDateFormat;
/**
* Parse the disk file to generate datagrid.
*/
public final class ParserCsv {
public ParserCsv(ImportCsv csvJob, TaskContext context) {
_context = context;
_job = csvJob;
}
public void setGFilename(String foldername) {
folder = foldername;
}
public void setFileDate(String m_ts) {
this.log_ts = m_ts;
}
public void parseDirectory(String path) throws Exception {
File file = new File(path);
if(!file.exists()) {
throw new RuntimeException("The given path: " + path + " not found the folder");
}
parseDirectory(file);
}
/**
* Parse the text data under the given folder.
*/
private void parseDirectory(File file) throws Exception {
if(!file.isDirectory()) {
throw new RuntimeException("The given path: " + file + " not found the folder");
}
File[] files = file.listFiles();
for(File fl : files) {
if(fl.isDirectory()) {
parseDirectory(fl);
}
else {
parse(fl);
}
}
}
public void parse(String fileName) throws Exception{
File file = new File(fileName);
parse(file);
}
public void parse(File file) throws Exception{
try {
log.log(Level.INFO, "start to parse file:___ " + file);
long ts = System.currentTimeMillis();
FileReader input = new FileReader(file);
BufferedReader readerInput = new BufferedReader(input);
String inputString = readerInput.readLine();
// map the text column index to grid index
long count = 0;
int r = 0;
int invalidRow = 0;
int parseRow = 0;
QGrid grid = (QGrid) QGrid.create(headers, types, cols);
while(inputString != null) {
if(inputString.trim().length() > 0) {
String[] str = StringUtils.splitPreserveAllTokens(inputString, ",");
if(str.length == 6) {
grid.add(0, validNumber(str[0]) ?
Long.parseLong(str[0]) : null);
grid.add(1, str[1]);
grid.add(2, str[2]);
Date date=format.parse(str[3]);
grid.add(3, date);
grid.add(4, str[4]);
grid.add(5, str[5]);
}
else {
// @temp humming, invalid data need to store the other file
log.log(Level.WARNING, "find invalid " + r + " record: " + inputString);
invalidRow++;
}
}
inputString = readerInput.readLine();
r++;
}
grid.complete();
log.log(Level.WARNING, "Add data for " + file + ", count=" + (count++) + " over , cost=" +
(System.currentTimeMillis() - ts));
log.log(Level.WARNING, "Invalid row record: " + invalidRow + " for total: " + r);
log.log(Level.WARNING, "Parse erro row record: " + parseRow + " for total: " + r);
GridUtil.print(grid, 10);
addGrid(grid, System.currentTimeMillis());
grid = (QGrid) QGrid.create(headers, types, cols);
}
catch(FileNotFoundException e) {
e.printStackTrace();
}
}
/**
* Invalid number.
*/
private boolean validNumber(String str) {
if ((str != null) && (str.length() > 0)) {
for (int i = str.length(); --i >= 0;) {
if (!Character.isDigit(str.charAt(i))) {
return false;
}
}
return true;
}
return false;
}
private void addGrid(QGrid grid, long ts) throws Exception {
// empty grid
if(!grid.exists(2, -1, true)) {
return;
}
GSFolderC holder = new GSFolderC();
holder.setFolder(folder);
holder.setFile(folder + ts);
holder.setAppend(true);
File dir = null;
GSFolder folder = null;
try {
dir = IOUtil.getTempFolder("CloudTask");
_job._split(_context, dir, holder.getFile(), grid, QUtil.metaCols(grid));
log.log(Level.INFO, ".............split datagrid.......");
// initialize GSFolder instance
folder = _job._initFolder(holder, holder.getFolder(), dir, QUtil.metaCols(grid));
//check the cloud name is activity or not
_job._checkActivity();
//remove the folder in GFS if exists and not append
if (!holder.isAppend()) {
_job._removeFolder("CloudTask", holder.getFolder());
}
// add into GFS
_job._addFolder("CloudTask", folder, holder.isAppend());
}
finally {
if(folder != null) {
folder.dispose();
}
if(grid != null) {
// close, release mem
grid.dispose();
grid = null;
}
if(dir != null) {
boolean removed = IOUtil.remove(dir);
if(!removed) {
log.log(Level.WARNING, "Failed to remove directory '" + dir.getAbsolutePath() + "' for GSFolder '" + holder.getFolder() + "'!");
}
}
}
}
public static String[] headers = {"NUMBER","KEYWORD","HOST","TIME", "PROVINCE","CITY"};
// the index 4 PTMSI is store the number with 16 rather than 10
public static byte[] types = {QColumnSeg.DYNAMIC_LONG,
QColumnSeg.DEF_STRING, QColumnSeg.DEF_STRING,
QColumnSeg.DYNAMIC_DATE_TIME, QColumnSeg.DEF_STRING,
QColumnSeg.DEF_STRING
};
public static QCol[] cols = {new BCol("NUMBER", DType.LONG),
new BCol("KEYWORD", DType.STRING,true), new BCol("HOST", DType.STRING,true),
new BCol("TIME", DType.DATE_TIME),new BCol("PROVINCE", DType.STRING,true),
new BCol("CITY", DType.STRING,true)
};
private static final Logger log = Logger.getLogger(DianliParserDsv.class.getSimpleName());
DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private String folder = "CSV";
private TaskContext _context;
private ImportCsv _job;
}
ImportCsv.java
import g5.DataGrid;
import g5.dc.fs.s.GSFolder;
import g5.dc.impl.DCUtil;
import g5.dc.node.impl.GSFolderC;
import g5.meta.*;
import g5.sched.*;
import g5.sched.jobs.AbstractCloudTask;
import g5.util.KeyUtil;
import java.io.File;
/**
* import data to cloud.
*/
public class ImportCsv extends AbstractCloudTask{
@Override
public Parameter[] getParams() throws Exception {
return new Parameter[] {new Parameter("gs_Folder", DType.STRING, new String[] {"theta"}),
new Parameter("csv_Folder", DType.STRING)};
}
@Override
public void run(TaskContext context) throws Exception {
ParserCsv p = new ParserCsv(this,context);
//get name of cloud folder from schedule
gs_Folder = context.getParam("gs_Folder").getValue().toString();
//get path of csv folder from schedule
csv_folder = context.getParam("csv_Folder").getValue().toString(); p.setGFilename(gs_Folder);
try {
p.setFileDate((String) context.getParam("log_ts").getValue());
}
catch(Exception ex) {
// ignore it
}
System.err.println("run:________ " + gs_Folder + ", src=" + csv_folder);
try {
p.parseDirectory(csv_folder);
}
catch(Exception ex) {
ex.printStackTrace();
throw ex;
}
}
/**
* Split the grid.
*/
public void _split(TaskContext context, File dir, String file, DataGrid jGrid, QCol cols[]) throws Exception {
super.split(context, dir, file, jGrid, cols);
}
/**
* Remove folder.
*/
public void _removeFolder(String task, String folder) throws Exception {
super.removeFolder(task, folder);
}
/**
* Init folder.
*/
public GSFolder _initFolder(GSFolderC holder, String folder, File dir, QCol cols[]) {
return super.initFolder(holder, folder, dir, cols);
}
/**
* Check cloud activity.
*/
public void _checkActivity() {
super.checkActivity();
}
/**
* Add cloud folder.
*/
public void _addFolder(String task, GSFolder folder, boolean isAppend) throws Exception {
super.addFolder(task, folder, isAppend);
}
/**
* Test main entrance.
*/
public static void main(String[] args) {
System.setProperty("bi.home", bihome);
// initialize license
if(!KeyUtil.installLicense()) {
System.out.println("invalid license!");
return;
}
DCUtil.init(true);
ImportCsv xJob = new ImportCsv();
try {
xJob.run(new SchedContext());
}
catch(Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static String bihome = "D:/YH/Yonghong/bihome";
private String csv_folder = "E:/data_csv";
private String gs_Folder = "CSV";
}
❖Run Query to obtain results
Method 1: executed the established query in bihome
import g5.DataGrid;
import g5.dc.impl.DCUtil;
import g5.grid.impl.GridUtil;
import g5.sched.*;
import g5.util.KeyUtil;
public class runQry {
public static void main(String[] args) {
System.setProperty("bi.home", bihome);
// initialize license
if(!KeyUtil.installLicense()) {
System.out.println("invalid license!");
return;
}
DCUtil.init(true);
try {
//run query
DataGrid grid = runQry("hive1.sqry",new SchedContext());
GridUtil.print(grid, 10);
}
catch (Exception e) {
e.printStackTrace();
}
}
private static DataGrid runQry(String path, TaskContext context) throws Exception {
return IRuiDataCache.getData(path, context);
}
private static String bihome = "D:/YH/Yonghong/bihome";
}
Method 2: connect the established data mart data set by jdbc.
import g5.dc.impl.DCUtil;
import g5.util.KeyUtil;
import java.sql.SQLException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.DriverManager;
public class runCloudQry {
public static void main(String[] args) throws SQLException {
System.setProperty("bi.home", bihome);
// initialize license
if(!KeyUtil.installLicense()) {
System.out.println("invalid license!");
return;
}
DCUtil.init(true);
try {
Class.forName(driverName);
}
catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.exit(1);
}
Connection con = null;
try {
con = DriverManager.getConnection("jdbc:yonghong:z", "admin", "g5");
Statement stmt = con.createStatement();
String sql = "select * from \"test.clqry\"";
System.out.println("Running1: " + sql);
ResultSet res = stmt.executeQuery(sql);
while (res.next()) {
System.out.println(String.valueOf(res.getString(1)));
}
}
finally {
try {
if(con != null) {
con.close();
}
}
catch(Exception ex) {
ex.printStackTrace();
}
}
}
private static String bihome = "D:/YH/Yonghong/bihome";
private static String driverName = "g5.dc.jdbc.GDriver";
}
❖Join Query
import g5.DataGrid;
import g5.dc.impl.DCUtil;
import g5.grid.JoinGrid;
import g5.grid.impl.GridUtil;
import g5.sched.SchedContext;
import g5.sched.TaskContext;
import g5.util.KeyUtil;
public class joinQuery {
public static void main(String[] args) throws Exception {
System.setProperty("bi.home", bihome);
// initialize license
if(!KeyUtil.installLicense()) {
System.out.println("invalid license!");
return;
}
DataGrid grid1 = runQry("test1.sqry",new SchedContext());
DataGrid grid2 = runQry("test2.sqry",new SchedContext());
DCUtil.init(true);
JoinGrid g = null;
byte hint = JoinGrid.FINAL_JOIN;
byte op = JoinGrid.JOIN;
short lkeys[] = {0};
short rkeys[] = {0};
short lcols[] = {0,1,2,3,4,5};
short rcols[] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14};
g = JoinGrid.create(hint, grid1, grid2 , lcols, rcols, lkeys, rkeys, op, null);
GridUtil.print(g, 10);
}
private static DataGrid runQry(String path, TaskContext context) throws Exception {
return IRuiDataCache.getData(path, context);
}
private static String bihome = "D:/YH/Yonghong/bihome";
}