How to work with real time streaming data/logs using spark streaming? -
i newbie spark , scala.
i want implement real time spark consumer read network logs on per minute basis [fetching around 1gb of json log lines/minute] kafka publisher , store aggregated values in elasticsearch.
aggregations based on few values [like bytes_in, bytes_out etc] using composite key [like : client mac, client ip, server mac, server ip etc].
spark consumer have written is:
object logsanalyzerscalacs{ def main(args : array[string]) { val sparkconf = new sparkconf().setappname("logs-aggregation") sparkconf.set("es.nodes", "my ip address") sparkconf.set("es.port", "9200") sparkconf.set("es.index.auto.create", "true") sparkconf.set("es.nodes.discovery", "false") val elasticresource = "conrec_1min/1minute" val ssc = new streamingcontext(sparkconf, seconds(30)) val zkquorum = "my zk quorum ips:2181" val consumergroupid = "logsconsumer" val topics = "logs" val topicmap = topics.split(",").map((_,3)).tomap val json = kafkautils.createstream(ssc, zkquorum, consumergroupid, topicmap) val logjson = json.map(_._2) try{ logjson.foreachrdd( rdd =>{ if(!rdd.isempty()){ val sqlcontext = sqlcontextsingleton.getinstance(rdd.sparkcontext) import sqlcontext.implicits._ val df = sqlcontext.read.json(rdd) val groupeddata = ((df.groupby("id","start_time_formated","l2_c","l3_c", "l4_c","l2_s","l3_s","l4_s")).agg(count("f_id") "total_f", sum("p_out") "total_p_out",sum("p_in") "total_p_in",sum("b_out") "total_b_out",sum("b_in") "total_b_in", sum("duration") "total_duration")) val datafores = groupeddata.withcolumnrenamed("start_time_formated", "start_time") datafores.savetoes(elasticresource) datafores.show(); } }) } catch{ case e: exception => print("exception has occurred : "+e.getmessage) } ssc.start() ssc.awaittermination() } object sqlcontextsingleton { @transient private var instance: org.apache.spark.sql.sqlcontext = _ def getinstance(sparkcontext: sparkcontext): org.apache.spark.sql.sqlcontext = { if (instance == null) { instance = new org.apache.spark.sql.sqlcontext(sparkcontext) } instance } } }
first of know if @ approach correct or not [considering need 1 min logs aggregation]?
there seems issue using code:
- this consumer pull data kafka broker every 30 seconds , saving final aggregation elasticsearch 30 sec data, hence increasing number of rows in elasticsearch unique key [at least 2 entries per 1 minute]. ui tool [ let's kibana] needs further aggregation. if increase polling time 30 sec 60 sec takes lot of time aggregate , hence not @ remains real time.
- i want implement in such way in elasticsearch 1 row per key should saved. hence want perform aggregation till time not getting new key values in dataset getting pulled kafka broker [per minute basis]. after doing googling have found achieved using groupbykey() , updatestatebykey() functions not able make out how use in case [should convert json log lines string of log line flat values , use these functions there]? if use these functions when should save final aggregated values elasticsearch?
- is there other way of achieving it?
your quick appreciated.
regards, bhupesh
Comments
Post a Comment