How to work with real time streaming data/logs using spark streaming? -


i newbie spark , scala.

i want implement real time spark consumer read network logs on per minute basis [fetching around 1gb of json log lines/minute] kafka publisher , store aggregated values in elasticsearch.

aggregations based on few values [like bytes_in, bytes_out etc] using composite key [like : client mac, client ip, server mac, server ip etc].

spark consumer have written is:

object logsanalyzerscalacs{     def main(args : array[string]) {           val sparkconf = new sparkconf().setappname("logs-aggregation")           sparkconf.set("es.nodes", "my ip address")           sparkconf.set("es.port", "9200")           sparkconf.set("es.index.auto.create", "true")           sparkconf.set("es.nodes.discovery", "false")            val elasticresource = "conrec_1min/1minute"           val ssc = new streamingcontext(sparkconf, seconds(30))           val zkquorum = "my zk quorum ips:2181"           val consumergroupid = "logsconsumer"           val topics = "logs"           val topicmap = topics.split(",").map((_,3)).tomap           val json = kafkautils.createstream(ssc, zkquorum, consumergroupid, topicmap)           val logjson = json.map(_._2)           try{             logjson.foreachrdd( rdd =>{               if(!rdd.isempty()){                   val sqlcontext = sqlcontextsingleton.getinstance(rdd.sparkcontext)                   import sqlcontext.implicits._                   val df = sqlcontext.read.json(rdd)                   val groupeddata =  ((df.groupby("id","start_time_formated","l2_c","l3_c", "l4_c","l2_s","l3_s","l4_s")).agg(count("f_id") "total_f", sum("p_out") "total_p_out",sum("p_in") "total_p_in",sum("b_out") "total_b_out",sum("b_in") "total_b_in", sum("duration") "total_duration"))                   val datafores = groupeddata.withcolumnrenamed("start_time_formated", "start_time")                   datafores.savetoes(elasticresource)                   datafores.show();                 }               })              }           catch{             case e: exception => print("exception has occurred : "+e.getmessage)           }           ssc.start()           ssc.awaittermination()         }  object sqlcontextsingleton {     @transient  private var instance: org.apache.spark.sql.sqlcontext = _     def getinstance(sparkcontext: sparkcontext): org.apache.spark.sql.sqlcontext = {       if (instance == null) {         instance = new org.apache.spark.sql.sqlcontext(sparkcontext)       }       instance     }   } } 

first of know if @ approach correct or not [considering need 1 min logs aggregation]?

there seems issue using code:

  1. this consumer pull data kafka broker every 30 seconds , saving final aggregation elasticsearch 30 sec data, hence increasing number of rows in elasticsearch unique key [at least 2 entries per 1 minute]. ui tool [ let's kibana] needs further aggregation. if increase polling time 30 sec 60 sec takes lot of time aggregate , hence not @ remains real time.
  2. i want implement in such way in elasticsearch 1 row per key should saved. hence want perform aggregation till time not getting new key values in dataset getting pulled kafka broker [per minute basis]. after doing googling have found achieved using groupbykey() , updatestatebykey() functions not able make out how use in case [should convert json log lines string of log line flat values , use these functions there]? if use these functions when should save final aggregated values elasticsearch?
  3. is there other way of achieving it?

your quick appreciated.

regards, bhupesh


Comments

Popular posts from this blog

Ansible - ERROR! the field 'hosts' is required but was not set -

customize file_field button ruby on rails -

SoapUI on windows 10 - high DPI/4K scaling issue -