Samza Failed to send message. Exception -
i'm using samza on aws emr instances, , have exception follows, can me?:
org.apache.samza.samzaexception: failed send message. exception: java.lang.illegalstateexception: cannot send after producer closed. @ org.apache.samza.system.kafka.kafkasystemproducer$$anonfun$send$5.apply(kafkasystemproducer.scala:120) @ org.apache.samza.system.kafka.kafkasystemproducer$$anonfun$send$5.apply(kafkasystemproducer.scala:111) @ org.apache.samza.util.exponentialsleepstrategy.run(exponentialsleepstrategy.scala:81) @ org.apache.samza.system.kafka.kafkasystemproducer.send(kafkasystemproducer.scala:86) @ org.apache.samza.system.systemproducers.send(systemproducers.scala:87) @ org.apache.samza.task.taskinstancecollector.send(taskinstancecollector.scala:61) @ org.apache.samza.storage.kv.loggedstore.putall(loggedstore.scala:72) @ org.apache.samza.storage.kv.serializedkeyvaluestore.putall(serializedkeyvaluestore.scala:57) @ org.apache.samza.storage.kv.cachedstore.flush(cachedstore.scala:166) @ org.apache.samza.storage.kv.nullsafekeyvaluestore.flush(nullsafekeyvaluestore.scala:69) @ org.apache.samza.storage.kv.keyvaluestorageengine.flush(keyvaluestorageengine.scala:113) @ org.apache.samza.storage.kv.keyvaluestorageengine.close(keyvaluestorageengine.scala:125) @ org.apache.samza.storage.kv.keyvaluestorageengine.stop(keyvaluestorageengine.scala:119) @ org.apache.samza.storage.taskstoragemanager$$anonfun$stop$2.apply(taskstoragemanager.scala:147) @ org.apache.samza.storage.taskstoragemanager$$anonfun$stop$2.apply(taskstoragemanager.scala:147) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.maplike$defaultvaluesiterable.foreach(maplike.scala:206) @ org.apache.samza.storage.taskstoragemanager.stop(taskstoragemanager.scala:147) @ org.apache.samza.container.taskinstance.shutdownstores(taskinstance.scala:185) @ org.apache.samza.container.samzacontainer$$anonfun$shutdownstores$2.apply(samzacontainer.scala:650) @ org.apache.samza.container.samzacontainer$$anonfun$shutdownstores$2.apply(samzacontainer.scala:650) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.maplike$defaultvaluesiterable.foreach(maplike.scala:206) @ org.apache.samza.container.samzacontainer.shutdownstores(samzacontainer.scala:650) @ org.apache.samza.container.samzacontainer.run(samzacontainer.scala:560) @ org.apache.samza.container.samzacontainer$.safemain(samzacontainer.scala:93) @ org.apache.samza.container.samzacontainer$.main(samzacontainer.scala:67) @ org.apache.samza.container.samzacontainer.main(samzacontainer.scala)
i missed real exception in wrong file.
Comments
Post a Comment