应用举例

<< Click to Display Table of Contents >>

当前位置:  数据集市 

应用举例

Previous pageReturn to chapter overviewNext page

Push data

从数据源的地方把数据推送到集群中,需要编写 usertask 程序来解析原始数据,然后将解析后的数据导入数据集市。请参考 Parse CSV 例子。

 

Pull data

用调度任务里的作业去数据源里拉数据。如图所示,在调度任务中新建一个作业,任务类型选择增量导入数据,选择一个建好的数据集,执行后可将数据集结果导入数据集市。

Job

 

增删Cloud File和Cloud Folder

import g5.Setting;

import g5.dc.impl.DCUtil;

import g5.dc.node.GNodeResult;

import g5.dc.node.RemoveFolderTask;

import g5.sv.db.DBService;

 

public class TestRemoveFolderTask {

public static void main(String[] args) throws Exception {

String bihome ="E:/source/yonghongsoft/source/home";

String cloudName = "testCloud";

System.setProperty("bi.home", bihome);

String license = Setting.get("license");

 

if(license == null || !g5.util.KeyUtil.init(license)) {

System.out.println("_________key invalid____");

return;

}

 

 

Setting.setBoolean("dc.naming.check.file", false);

DCUtil.init(true);

DBService.get();

 

System.err.println("Press any key to generate...");

System.in.read();

System.in.read();

RemoveFolderTask task = new RemoveFolderTask(cloudName);

GNodeResult result = task.exec();

 

if(result != null) {

System.out.println("____result:_________"+result.state()+"|"+result.toString());

}

}

}

 

Parse CSV

ParserCsv.java

import g5.dc.fs.s.GSFolder;

import g5.dc.node.impl.GSFolderC;

import g5.grid.impl.GridUtil;

import g5.io.IOUtil;

import g5.meta.BCol;

import g5.meta.DType;

import g5.meta.QCol;

import g5.qry.impl.QColumnSeg;

import g5.qry.impl.QGrid;

import g5.qry.impl.QUtil;

import g5.sched.TaskContext;

 

import java.io.*;

import java.util.Date;

import java.util.logging.Level;

import java.util.logging.Logger;

import org.apache.commons.lang.StringUtils;

import com.ibm.icu.text.DateFormat;

import com.ibm.icu.text.SimpleDateFormat;

 

 

 

/**

* Parse the disk file to generate datagrid.

*/

public final class ParserCsv {

public ParserCsv(ImportCsv csvJob, TaskContext context) {

  _context = context;

  _job = csvJob;

}

 

public void setGFilename(String foldername) {

  folder = foldername;

}

 

public void setFileDate(String m_ts) {

  this.log_ts = m_ts;

}

 

 

public void parseDirectory(String path) throws Exception {

File file = new File(path);

 

if(!file.exists()) {

throw new RuntimeException("The given path: " + path + " not found the folder");

}

 

parseDirectory(file);

}

 

 

/**

* Parse the text data under the given folder.

*/

private void parseDirectory(File file) throws Exception {

  if(!file.isDirectory()) {

throw new RuntimeException("The given path: " + file + " not found the folder");

}

 

File[] files = file.listFiles();

 

for(File fl : files) {

if(fl.isDirectory()) {

parseDirectory(fl);

}

else {

parse(fl);

}

}

}

 

 

 

public void parse(String fileName) throws Exception{

File file = new File(fileName);

parse(file);

}

 

 

 

public void parse(File file) throws Exception{

try {

log.log(Level.INFO, "start to parse file:___ " + file);

long ts = System.currentTimeMillis();

FileReader input = new FileReader(file);

BufferedReader readerInput = new BufferedReader(input);

String inputString = readerInput.readLine();

// map the text column index to grid index

long count = 0;

int r = 0;

int invalidRow = 0;

int parseRow = 0;

QGrid grid = (QGrid) QGrid.create(headers, types, cols);

 

 

while(inputString != null) {

if(inputString.trim().length() > 0) {

 

String[] str = StringUtils.splitPreserveAllTokens(inputString, ",");

if(str.length == 6) {

grid.add(0, validNumber(str[0]) ?

     Long.parseLong(str[0]) : null);

grid.add(1, str[1]);

grid.add(2, str[2]);

Date date=format.parse(str[3]);

grid.add(3, date);

grid.add(4, str[4]);

grid.add(5, str[5]);

}

else {

// @temp humming, invalid data need to store the other file

log.log(Level.WARNING, "find invalid " + r + " record: " + inputString);

invalidRow++;

}

}

 

inputString = readerInput.readLine();

r++;

}

 

grid.complete();

log.log(Level.WARNING, "Add data for " + file + ", count=" + (count++) + " over , cost=" +

(System.currentTimeMillis() - ts));

log.log(Level.WARNING, "Invalid row record: " + invalidRow + " for total: " + r);

log.log(Level.WARNING, "Parse erro row record: " + parseRow + " for total: " + r);

GridUtil.print(grid, 10);

addGrid(grid, System.currentTimeMillis());

grid = (QGrid) QGrid.create(headers, types, cols);

}

catch(FileNotFoundException e) {

e.printStackTrace();

}

}

 

 

/**

* Invalid number.

*/

private boolean validNumber(String str) {

if ((str != null) && (str.length() > 0)) {

for (int i = str.length(); --i >= 0;) {

if (!Character.isDigit(str.charAt(i))) {

return false;

}

}

return true;

}

return false;

}

 

private void addGrid(QGrid grid, long ts) throws Exception {

// empty grid

if(!grid.exists(2, -1, true)) {

return;

 

}

 

GSFolderC holder = new GSFolderC();

holder.setFolder(folder);

holder.setFile(folder + ts);

holder.setAppend(true);

File dir = null;

GSFolder folder = null;

 

try {

 

dir = IOUtil.getTempFolder("CloudTask");

_job._split(_context, dir, holder.getFile(), grid, QUtil.metaCols(grid));

log.log(Level.INFO, ".............split datagrid.......");

// initialize GSFolder instance

folder = _job._initFolder(holder, holder.getFolder(), dir, QUtil.metaCols(grid));

 

//check the cloud name is activity or not

_job._checkActivity();

 

//remove the folder in GFS if exists and not append

if (!holder.isAppend()) {

_job._removeFolder("CloudTask", holder.getFolder());

}

 

// add into GFS

_job._addFolder("CloudTask", folder, holder.isAppend());

 

}

finally {

if(folder != null) {

folder.dispose();

}

 

if(grid != null) {

// close, release mem

grid.dispose();

grid = null;

}

 

if(dir != null) {

boolean removed = IOUtil.remove(dir);

 

if(!removed) {

log.log(Level.WARNING, "Failed to remove directory '" + dir.getAbsolutePath() + "' for GSFolder '" + holder.getFolder() + "'!");

}

}

}

}

public static String[] headers = {"NUMBER","KEYWORD","HOST","TIME", "PROVINCE","CITY"};

// the index 4 PTMSI is store the number with 16 rather than 10

public static byte[] types = {QColumnSeg.DYNAMIC_LONG,

QColumnSeg.DEF_STRING, QColumnSeg.DEF_STRING,

QColumnSeg.DYNAMIC_DATE_TIME, QColumnSeg.DEF_STRING,

QColumnSeg.DEF_STRING

};

 

 

public static QCol[] cols = {new BCol("NUMBER", DType.LONG),

new BCol("KEYWORD", DType.STRING,true), new BCol("HOST", DType.STRING,true),

new BCol("TIME", DType.DATE_TIME),new BCol("PROVINCE", DType.STRING,true),

new BCol("CITY", DType.STRING,true)

};

 

private static final Logger log = Logger.getLogger(DianliParserDsv.class.getSimpleName());

DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

private String folder = "CSV";

private TaskContext _context;

private ImportCsv _job;

}

ImportCsv.java

import g5.DataGrid;

import g5.dc.fs.s.GSFolder;

import g5.dc.impl.DCUtil;

import g5.dc.node.impl.GSFolderC;

import g5.meta.*;

import g5.sched.*;

import g5.sched.jobs.AbstractCloudTask;

import g5.util.KeyUtil;

import java.io.File;

 

 

/**

* import data to cloud.

*/

public class ImportCsv extends AbstractCloudTask{

@Override

public Parameter[] getParams() throws Exception {

return new Parameter[] {new Parameter("gs_Folder", DType.STRING, new String[] {"theta"}),

new Parameter("csv_Folder", DType.STRING)};

}

 

@Override

public void run(TaskContext context) throws Exception {

ParserCsv p = new ParserCsv(this,context);

//get name of cloud folder from schedule

gs_Folder = context.getParam("gs_Folder").getValue().toString();

//get path of csv folder from schedule

csv_folder = context.getParam("csv_Folder").getValue().toString(); p.setGFilename(gs_Folder);

 

try {

p.setFileDate((String) context.getParam("log_ts").getValue());

 

}

 

catch(Exception ex) {

// ignore it

}

 

System.err.println("run:________ " + gs_Folder + ", src=" + csv_folder);

try {

p.parseDirectory(csv_folder);

}

catch(Exception ex) {

ex.printStackTrace();

throw ex;

}

}

 

 

 

/**

* Split the grid.

*/

public void _split(TaskContext context, File dir, String file, DataGrid jGrid, QCol cols[]) throws Exception {

super.split(context, dir, file, jGrid, cols);

}

 

 

 

/**

* Remove folder.

*/

public void _removeFolder(String task, String folder) throws Exception {

super.removeFolder(task, folder);

}

 

 

/**

* Init folder.

*/

public GSFolder _initFolder(GSFolderC holder, String folder, File dir, QCol cols[]) {

return super.initFolder(holder, folder, dir, cols);

}

 

 

 

/**

* Check cloud activity.

*/

public void _checkActivity() {

super.checkActivity();

}

 

 

 

/**

* Add cloud folder.

*/

public void _addFolder(String task, GSFolder folder, boolean isAppend) throws Exception {

super.addFolder(task, folder, isAppend);

}

 

/**

* Test main entrance.

*/

public static void main(String[] args) {

System.setProperty("bi.home", bihome);

 

// initialize license

if(!KeyUtil.installLicense()) {

System.out.println("invalid license!");

return;

}

 

DCUtil.init(true);

ImportCsv xJob = new ImportCsv();

 

try {

xJob.run(new SchedContext());

}

catch(Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

 

}

private static String bihome = "D:/YH/Yonghong/bihome";

private String csv_folder = "E:/data_csv";

private String gs_Folder = "CSV";

}

运行Query获得结果

方法 1:执行 bihome 中已经建好的 query

 

import g5.DataGrid;

import g5.dc.impl.DCUtil;

import g5.grid.impl.GridUtil;

import g5.sched.*;

import g5.util.KeyUtil;

 

public class runQry {

 

public static void main(String[] args) {

System.setProperty("bi.home", bihome);

 

// initialize license

if(!KeyUtil.installLicense()) {

System.out.println("invalid license!");

return;

}

 

 

 

DCUtil.init(true);

try {

//run query

DataGrid grid = runQry("hive1.sqry",new SchedContext());

GridUtil.print(grid, 10);

}

catch (Exception e) {

e.printStackTrace();

}

}

 

private static DataGrid runQry(String path, TaskContext context) throws Exception {

return IRuiDataCache.getData(path, context);

}

 

private static String bihome = "D:/YH/Yonghong/bihome";

}

 

方法 2:通过 jdbc 连接已经建好的数据集市数据集。

 

import g5.dc.impl.DCUtil;

import g5.util.KeyUtil;

 

import java.sql.SQLException;

import java.sql.Connection;

import java.sql.ResultSet;

import java.sql.Statement;

import java.sql.DriverManager;

 

public class runCloudQry {

 

public static void main(String[] args) throws SQLException {

System.setProperty("bi.home", bihome);

 

// initialize license

if(!KeyUtil.installLicense()) {

System.out.println("invalid license!");

return;

 

}

 

DCUtil.init(true);

try {

 

Class.forName(driverName);

 

}

catch (ClassNotFoundException e) {

// TODO Auto-generated catch block

e.printStackTrace();

System.exit(1);

}

 

Connection con = null;

 

try {

con = DriverManager.getConnection("jdbc:yonghong:z", "admin", "g5");

Statement stmt = con.createStatement();

 

String sql = "select * from \"test.clqry\"";

System.out.println("Running1: " + sql);

ResultSet res = stmt.executeQuery(sql);

 

while (res.next()) {

System.out.println(String.valueOf(res.getString(1)));

}

 

}

finally {

try {

if(con != null) {

con.close();

}

}

catch(Exception ex) {

ex.printStackTrace();

}

}

}

private static String bihome = "D:/YH/Yonghong/bihome";

private static String driverName = "g5.dc.jdbc.GDriver";

}

 

Join Query

import g5.DataGrid;

import g5.dc.impl.DCUtil;

import g5.grid.JoinGrid;

import g5.grid.impl.GridUtil;

import g5.sched.SchedContext;

import g5.sched.TaskContext;

import g5.util.KeyUtil;

 

public class joinQuery {

public static void main(String[] args) throws Exception {

System.setProperty("bi.home", bihome);

 

 

// initialize license

if(!KeyUtil.installLicense()) {

System.out.println("invalid license!");

return;

}

 

DataGrid grid1 = runQry("test1.sqry",new SchedContext());

DataGrid grid2 = runQry("test2.sqry",new SchedContext());

DCUtil.init(true);

JoinGrid g = null;

byte hint = JoinGrid.FINAL_JOIN;

byte op = JoinGrid.JOIN;

short lkeys[] = {0};

short rkeys[] = {0};

short lcols[] = {0,1,2,3,4,5};

short rcols[] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14};

g = JoinGrid.create(hint, grid1, grid2 , lcols, rcols, lkeys, rkeys, op, null);

GridUtil.print(g, 10);

}

 

private static DataGrid runQry(String path, TaskContext context) throws Exception {

return IRuiDataCache.getData(path, context);

}

 

private static String bihome = "D:/YH/Yonghong/bihome";

}