<< Click to Display Table of Contents >> 应用举例 |
❖Push data
从数据源的地方把数据推送到集群中,需要编写 usertask 程序来解析原始数据,然后将解析后的数据导入数据集市。请参考 Parse CSV 例子。
❖Pull data
用调度任务里的作业去数据源里拉数据。如图所示,在调度任务中新建一个作业,任务类型选择增量导入数据,选择一个建好的数据集,执行后可将数据集结果导入数据集市。
❖增删Cloud File和Cloud Folder
import g5.Setting;
import g5.dc.impl.DCUtil;
import g5.dc.node.GNodeResult;
import g5.dc.node.RemoveFolderTask;
import g5.sv.db.DBService;
public class TestRemoveFolderTask {
public static void main(String[] args) throws Exception {
String bihome ="E:/source/yonghongsoft/source/home";
String cloudName = "testCloud";
System.setProperty("bi.home", bihome);
String license = Setting.get("license");
if(license == null || !g5.util.KeyUtil.init(license)) {
System.out.println("_________key invalid____");
return;
}
Setting.setBoolean("dc.naming.check.file", false);
DCUtil.init(true);
DBService.get();
System.err.println("Press any key to generate...");
System.in.read();
System.in.read();
RemoveFolderTask task = new RemoveFolderTask(cloudName);
GNodeResult result = task.exec();
if(result != null) {
System.out.println("____result:_________"+result.state()+"|"+result.toString());
}
}
}
❖Parse CSV
ParserCsv.java
import g5.dc.fs.s.GSFolder;
import g5.dc.node.impl.GSFolderC;
import g5.grid.impl.GridUtil;
import g5.io.IOUtil;
import g5.meta.BCol;
import g5.meta.DType;
import g5.meta.QCol;
import g5.qry.impl.QColumnSeg;
import g5.qry.impl.QGrid;
import g5.qry.impl.QUtil;
import g5.sched.TaskContext;
import java.io.*;
import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang.StringUtils;
import com.ibm.icu.text.DateFormat;
import com.ibm.icu.text.SimpleDateFormat;
/**
* Parse the disk file to generate datagrid.
*/
public final class ParserCsv {
public ParserCsv(ImportCsv csvJob, TaskContext context) {
_context = context;
_job = csvJob;
}
public void setGFilename(String foldername) {
folder = foldername;
}
public void setFileDate(String m_ts) {
this.log_ts = m_ts;
}
public void parseDirectory(String path) throws Exception {
File file = new File(path);
if(!file.exists()) {
throw new RuntimeException("The given path: " + path + " not found the folder");
}
parseDirectory(file);
}
/**
* Parse the text data under the given folder.
*/
private void parseDirectory(File file) throws Exception {
if(!file.isDirectory()) {
throw new RuntimeException("The given path: " + file + " not found the folder");
}
File[] files = file.listFiles();
for(File fl : files) {
if(fl.isDirectory()) {
parseDirectory(fl);
}
else {
parse(fl);
}
}
}
public void parse(String fileName) throws Exception{
File file = new File(fileName);
parse(file);
}
public void parse(File file) throws Exception{
try {
log.log(Level.INFO, "start to parse file:___ " + file);
long ts = System.currentTimeMillis();
FileReader input = new FileReader(file);
BufferedReader readerInput = new BufferedReader(input);
String inputString = readerInput.readLine();
// map the text column index to grid index
long count = 0;
int r = 0;
int invalidRow = 0;
int parseRow = 0;
QGrid grid = (QGrid) QGrid.create(headers, types, cols);
while(inputString != null) {
if(inputString.trim().length() > 0) {
String[] str = StringUtils.splitPreserveAllTokens(inputString, ",");
if(str.length == 6) {
grid.add(0, validNumber(str[0]) ?
Long.parseLong(str[0]) : null);
grid.add(1, str[1]);
grid.add(2, str[2]);
Date date=format.parse(str[3]);
grid.add(3, date);
grid.add(4, str[4]);
grid.add(5, str[5]);
}
else {
// @temp humming, invalid data need to store the other file
log.log(Level.WARNING, "find invalid " + r + " record: " + inputString);
invalidRow++;
}
}
inputString = readerInput.readLine();
r++;
}
grid.complete();
log.log(Level.WARNING, "Add data for " + file + ", count=" + (count++) + " over , cost=" +
(System.currentTimeMillis() - ts));
log.log(Level.WARNING, "Invalid row record: " + invalidRow + " for total: " + r);
log.log(Level.WARNING, "Parse erro row record: " + parseRow + " for total: " + r);
GridUtil.print(grid, 10);
addGrid(grid, System.currentTimeMillis());
grid = (QGrid) QGrid.create(headers, types, cols);
}
catch(FileNotFoundException e) {
e.printStackTrace();
}
}
/**
* Invalid number.
*/
private boolean validNumber(String str) {
if ((str != null) && (str.length() > 0)) {
for (int i = str.length(); --i >= 0;) {
if (!Character.isDigit(str.charAt(i))) {
return false;
}
}
return true;
}
return false;
}
private void addGrid(QGrid grid, long ts) throws Exception {
// empty grid
if(!grid.exists(2, -1, true)) {
return;
}
GSFolderC holder = new GSFolderC();
holder.setFolder(folder);
holder.setFile(folder + ts);
holder.setAppend(true);
File dir = null;
GSFolder folder = null;
try {
dir = IOUtil.getTempFolder("CloudTask");
_job._split(_context, dir, holder.getFile(), grid, QUtil.metaCols(grid));
log.log(Level.INFO, ".............split datagrid.......");
// initialize GSFolder instance
folder = _job._initFolder(holder, holder.getFolder(), dir, QUtil.metaCols(grid));
//check the cloud name is activity or not
_job._checkActivity();
//remove the folder in GFS if exists and not append
if (!holder.isAppend()) {
_job._removeFolder("CloudTask", holder.getFolder());
}
// add into GFS
_job._addFolder("CloudTask", folder, holder.isAppend());
}
finally {
if(folder != null) {
folder.dispose();
}
if(grid != null) {
// close, release mem
grid.dispose();
grid = null;
}
if(dir != null) {
boolean removed = IOUtil.remove(dir);
if(!removed) {
log.log(Level.WARNING, "Failed to remove directory '" + dir.getAbsolutePath() + "' for GSFolder '" + holder.getFolder() + "'!");
}
}
}
}
public static String[] headers = {"NUMBER","KEYWORD","HOST","TIME", "PROVINCE","CITY"};
// the index 4 PTMSI is store the number with 16 rather than 10
public static byte[] types = {QColumnSeg.DYNAMIC_LONG,
QColumnSeg.DEF_STRING, QColumnSeg.DEF_STRING,
QColumnSeg.DYNAMIC_DATE_TIME, QColumnSeg.DEF_STRING,
QColumnSeg.DEF_STRING
};
public static QCol[] cols = {new BCol("NUMBER", DType.LONG),
new BCol("KEYWORD", DType.STRING,true), new BCol("HOST", DType.STRING,true),
new BCol("TIME", DType.DATE_TIME),new BCol("PROVINCE", DType.STRING,true),
new BCol("CITY", DType.STRING,true)
};
private static final Logger log = Logger.getLogger(DianliParserDsv.class.getSimpleName());
DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private String folder = "CSV";
private TaskContext _context;
private ImportCsv _job;
}
ImportCsv.java
import g5.DataGrid;
import g5.dc.fs.s.GSFolder;
import g5.dc.impl.DCUtil;
import g5.dc.node.impl.GSFolderC;
import g5.meta.*;
import g5.sched.*;
import g5.sched.jobs.AbstractCloudTask;
import g5.util.KeyUtil;
import java.io.File;
/**
* import data to cloud.
*/
public class ImportCsv extends AbstractCloudTask{
@Override
public Parameter[] getParams() throws Exception {
return new Parameter[] {new Parameter("gs_Folder", DType.STRING, new String[] {"theta"}),
new Parameter("csv_Folder", DType.STRING)};
}
@Override
public void run(TaskContext context) throws Exception {
ParserCsv p = new ParserCsv(this,context);
//get name of cloud folder from schedule
gs_Folder = context.getParam("gs_Folder").getValue().toString();
//get path of csv folder from schedule
csv_folder = context.getParam("csv_Folder").getValue().toString(); p.setGFilename(gs_Folder);
try {
p.setFileDate((String) context.getParam("log_ts").getValue());
}
catch(Exception ex) {
// ignore it
}
System.err.println("run:________ " + gs_Folder + ", src=" + csv_folder);
try {
p.parseDirectory(csv_folder);
}
catch(Exception ex) {
ex.printStackTrace();
throw ex;
}
}
/**
* Split the grid.
*/
public void _split(TaskContext context, File dir, String file, DataGrid jGrid, QCol cols[]) throws Exception {
super.split(context, dir, file, jGrid, cols);
}
/**
* Remove folder.
*/
public void _removeFolder(String task, String folder) throws Exception {
super.removeFolder(task, folder);
}
/**
* Init folder.
*/
public GSFolder _initFolder(GSFolderC holder, String folder, File dir, QCol cols[]) {
return super.initFolder(holder, folder, dir, cols);
}
/**
* Check cloud activity.
*/
public void _checkActivity() {
super.checkActivity();
}
/**
* Add cloud folder.
*/
public void _addFolder(String task, GSFolder folder, boolean isAppend) throws Exception {
super.addFolder(task, folder, isAppend);
}
/**
* Test main entrance.
*/
public static void main(String[] args) {
System.setProperty("bi.home", bihome);
// initialize license
if(!KeyUtil.installLicense()) {
System.out.println("invalid license!");
return;
}
DCUtil.init(true);
ImportCsv xJob = new ImportCsv();
try {
xJob.run(new SchedContext());
}
catch(Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static String bihome = "D:/YH/Yonghong/bihome";
private String csv_folder = "E:/data_csv";
private String gs_Folder = "CSV";
}
❖运行Query获得结果
方法 1:执行 bihome 中已经建好的 query
import g5.DataGrid;
import g5.dc.impl.DCUtil;
import g5.grid.impl.GridUtil;
import g5.sched.*;
import g5.util.KeyUtil;
public class runQry {
public static void main(String[] args) {
System.setProperty("bi.home", bihome);
// initialize license
if(!KeyUtil.installLicense()) {
System.out.println("invalid license!");
return;
}
DCUtil.init(true);
try {
//run query
DataGrid grid = runQry("hive1.sqry",new SchedContext());
GridUtil.print(grid, 10);
}
catch (Exception e) {
e.printStackTrace();
}
}
private static DataGrid runQry(String path, TaskContext context) throws Exception {
return IRuiDataCache.getData(path, context);
}
private static String bihome = "D:/YH/Yonghong/bihome";
}
方法 2:通过 jdbc 连接已经建好的数据集市数据集。
import g5.dc.impl.DCUtil;
import g5.util.KeyUtil;
import java.sql.SQLException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.DriverManager;
public class runCloudQry {
public static void main(String[] args) throws SQLException {
System.setProperty("bi.home", bihome);
// initialize license
if(!KeyUtil.installLicense()) {
System.out.println("invalid license!");
return;
}
DCUtil.init(true);
try {
Class.forName(driverName);
}
catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.exit(1);
}
Connection con = null;
try {
con = DriverManager.getConnection("jdbc:yonghong:z", "admin", "g5");
Statement stmt = con.createStatement();
String sql = "select * from \"test.clqry\"";
System.out.println("Running1: " + sql);
ResultSet res = stmt.executeQuery(sql);
while (res.next()) {
System.out.println(String.valueOf(res.getString(1)));
}
}
finally {
try {
if(con != null) {
con.close();
}
}
catch(Exception ex) {
ex.printStackTrace();
}
}
}
private static String bihome = "D:/YH/Yonghong/bihome";
private static String driverName = "g5.dc.jdbc.GDriver";
}
❖Join Query
import g5.DataGrid;
import g5.dc.impl.DCUtil;
import g5.grid.JoinGrid;
import g5.grid.impl.GridUtil;
import g5.sched.SchedContext;
import g5.sched.TaskContext;
import g5.util.KeyUtil;
public class joinQuery {
public static void main(String[] args) throws Exception {
System.setProperty("bi.home", bihome);
// initialize license
if(!KeyUtil.installLicense()) {
System.out.println("invalid license!");
return;
}
DataGrid grid1 = runQry("test1.sqry",new SchedContext());
DataGrid grid2 = runQry("test2.sqry",new SchedContext());
DCUtil.init(true);
JoinGrid g = null;
byte hint = JoinGrid.FINAL_JOIN;
byte op = JoinGrid.JOIN;
short lkeys[] = {0};
short rkeys[] = {0};
short lcols[] = {0,1,2,3,4,5};
short rcols[] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14};
g = JoinGrid.create(hint, grid1, grid2 , lcols, rcols, lkeys, rkeys, op, null);
GridUtil.print(g, 10);
}
private static DataGrid runQry(String path, TaskContext context) throws Exception {
return IRuiDataCache.getData(path, context);
}
private static String bihome = "D:/YH/Yonghong/bihome";
}