How do we perform custom operations on WindowedStream in Flink? -
i want perform operations on windowedstream in flink, average. there limited operations available predefined sum, min, max etc.
val windowedstream = valuestream .keyby(0) .timewindow(time.minutes(5)) .sum(2) //change average?
suppose want find average, how can that?
flink not have built-in function compute average on windowstream
. have implement custom windowfunction
this.
the efficient way implement reducefunction
compute count , sum of value average , subsequent windowfunction
takes result of reducefunction
, computes average. using reducefunction
more efficient, because flink applies directly on incoming values. hence aggregates values on fly , not collect them in window. significantlyy reduces memory footprint of window.
since output of reducefunction
has same type input, need add field count before applying reducefunction
.
something following should trick:
val valuestream: datastream[(string, double)] = ??? val r: datastream[(string, double)] = valuestream // append 1l counting .map(x => (x._1, x._2, 1l)) // key , window stream .keyby(0).timewindow(time.minutes(5)) .apply( // reducefunction (compute sum , count) (x: (string, double, long), y: (string, double, long)) => (x._1, x._2 + y._2, x._3 + y._3), // windowfunction (key, window: timewindow, input: iterable[(string, double, long)], out: collector[(string, double)]) => { // first (and only) value val x: (string, double, long) = input.toiterator.next // compute average sum / count out.collect(x._1, x._2 / x._3) } )
Comments
Post a Comment