MLSQL Stack如何讓流調(diào)試更加簡(jiǎn)單詳解
有一位同學(xué)正在調(diào)研MLSQL Stack對(duì)流的支持。然后說(shuō)了流調(diào)試其實(shí)挺困難的。經(jīng)過(guò)實(shí)踐,希望實(shí)現(xiàn)如下三點(diǎn):
- 能隨時(shí)查看最新固定條數(shù)的Kafka數(shù)據(jù)
- 調(diào)試結(jié)果(sink)能打印在web控制臺(tái)
- 流程序能自動(dòng)推測(cè)json schema(現(xiàn)在spark是不行的)
實(shí)現(xiàn)這三個(gè)點(diǎn)之后,我發(fā)現(xiàn)調(diào)試確實(shí)就變得簡(jiǎn)單很多了。
流程
首先我新建了一個(gè)kaf_write.mlsql,里面方便我往Kafka里寫(xiě)數(shù)據(jù):
set abc='''
{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
''';
load jsonStr.`abc` as table1;
select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where
kafka.bootstrap.servers="127.0.0.1:9092";
這樣我每次運(yùn)行,數(shù)據(jù)就能寫(xiě)入到Kafka.
接著,我寫(xiě)完后,需要看看數(shù)據(jù)是不是真的都寫(xiě)進(jìn)去了,寫(xiě)成了什么樣子:
!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;
這句話表示,我要采樣Kafka 10條Kafka數(shù)據(jù),該Kafka的地址為127.0.0.1:9092,主題為wow.運(yùn)行結(jié)果如下:

沒(méi)有什么問(wèn)題。接著我寫(xiě)一個(gè)非常簡(jiǎn)單的流式程序:
-- the stream name, should be uniq. set streamName="streamExample"; -- use kafkaTool to infer schema from kafka !kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow; load kafka.`wow` options kafka.bootstrap.servers="127.0.0.1:9092" as newkafkatable1; select * from newkafkatable1 as table21; -- print in webConsole instead of terminal console. save append table21 as webConsole.`` options mode="Append" and duration="15" and checkpointLocation="/tmp/s-cpl4";
運(yùn)行結(jié)果如下:

在終端我們也可以看到實(shí)時(shí)效果了。
補(bǔ)充
當(dāng)然,MLSQL Stack 還有對(duì)流還有兩個(gè)特別好地方,第一個(gè)是你可以對(duì)流的事件設(shè)置http協(xié)議的callback,以及對(duì)流的處理結(jié)果再使用批SQL進(jìn)行處理,最后入庫(kù)。參看如下腳本:
-- the stream name, should be uniq.
set streamName="streamExample";
-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';
-- load data as table
load jsonStr.`data` as datasource;
-- convert table as stream source
load mockStream.`datasource` options
stepSizeRange="0-3"
as newkafkatable1;
-- aggregation
select cast(value as string) as k from newkafkatable1
as table21;
!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- output the the result to console.
save append table21
as custom.``
options mode="append"
and duration="15"
and sourceTable="jack"
and code='''
select count(*) as c from jack as newjack;
save append newjack as parquet.`/tmp/jack`;
'''
and checkpointLocation="/tmp/cpl15";
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)本站的支持。
版權(quán)聲明:本站文章來(lái)源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來(lái)源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非maisonbaluchon.cn所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來(lái)源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來(lái),僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。
關(guān)注官方微信