How do we perform custom operations on WindowedStream in Flink? -


i want perform operations on windowedstream in flink, average. there limited operations available predefined sum, min, max etc.

val windowedstream = valuestream                           .keyby(0)                           .timewindow(time.minutes(5))                           .sum(2) //change average? 

suppose want find average, how can that?

flink not have built-in function compute average on windowstream. have implement custom windowfunction this.

the efficient way implement reducefunction compute count , sum of value average , subsequent windowfunction takes result of reducefunction , computes average. using reducefunction more efficient, because flink applies directly on incoming values. hence aggregates values on fly , not collect them in window. significantlyy reduces memory footprint of window.

since output of reducefunction has same type input, need add field count before applying reducefunction.

something following should trick:

val valuestream: datastream[(string, double)] = ???  val r: datastream[(string, double)] = valuestream   // append 1l counting   .map(x => (x._1, x._2, 1l))   // key , window stream   .keyby(0).timewindow(time.minutes(5))   .apply(     // reducefunction (compute sum , count)     (x: (string, double, long), y: (string, double, long)) =>        (x._1, x._2 + y._2, x._3 + y._3),     // windowfunction     (key, window: timewindow, input: iterable[(string, double, long)], out: collector[(string, double)]) => {       // first (and only) value       val x: (string, double, long) = input.toiterator.next       // compute average sum / count       out.collect(x._1, x._2 / x._3)     }   ) 

Comments

Popular posts from this blog

Ansible - ERROR! the field 'hosts' is required but was not set -

customize file_field button ruby on rails -

SoapUI on windows 10 - high DPI/4K scaling issue -