join - SPARK - Joining two data streams - maintenance of cache -
it evident out of box join capability in spark streaming not warrent lot of real life use cases. reason being joins data contained in micro batch rdds.
use case join data 2 kafka streams , enrich each object in stream1 it's corresponding object in stream2 in spark , save hbase.
implementation
maintain dataset in memory objects stream2, adding or replacing objects , when recieved
for every element in stream1, access cache find matching object stream2, save hbase if match found or put on kafka stream if not.
this question on exploration of spark streaming , it's api find way implement above mentioned.
a start mapwithstate
. more efficient replacement updatestatebykey
. these defined on pairdstreamfunction
, assuming objects of type v
in stream2 identified key of type k
, first point go this:
def stream2: dstream[(k, v)] = ??? def maintainstream2objects(key: k, value: option[v], state: state[v]): (k, v) = { value.foreach(state.update(_)) (key, state.get()) } val spec = statespec.function(maintainstream2objects) val stream2state = stream2.mapwithstate(spec)
stream2state
stream each batch contains (k, v)
pairs latest value seen each key. can join on stream , stream1
perform further logic second point.
Comments
Post a Comment