Skip to content

Siddhi Kafka Connection #1818

@programmedrn

Description

@programmedrn

Description:

I'm Using siddhi-tooling-5.1.2,
and trying to connect to kafka via web editor on chrome.

and this is my code,

`@App:name('KakfaUiSample')
@app:description('Receives events via simulation and publish them to a kafka topic in json format')

@sink(type = 'log',
@Map(type = 'json', validate.json = "true"))
define stream pm25LimitedStream(time string, pm25 double);
--Send events to kafka_json_topic

@source(type = 'kafka', bootstrap.servers = "kafka109:9092", topic.list = "airmap4", group.id = "SiddhiUiKafka", threading.option = "single.thread", enable.offsets.commit="true",
@Map(type = 'json'))
define stream KafkaAirmapStream (ts string, svcCode string, svcTgtSeq string, spotDevSeq string, groupTagCd string, occDt string, deviceModelId string, pm10 float, pm25 float, triOxygen float);

@info(name = 'pm25Limitation')
from KafkaAirmapStream
select ts as time, pm25
having pm25 > 70
order by time ASC
insert into pm25LimitedStream;
It works at the first time, and then I stopped to optimize some values, and then it started to give me an errors, saying
[2023-06-08_18-26-51_768] ERROR {io.siddhi.extension.io.kafka.source.ConsumerKafkaGroup} - Error while creating KafkaConsumerThread for topic(s): [airmap4]
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@636dc392 rejected from java.util.concurrent.ThreadPoolExecutor@599dc55c[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at io.siddhi.extension.io.kafka.source.ConsumerKafkaGroup.run(ConsumerKafkaGroup.java:111)
at io.siddhi.extension.io.kafka.source.KafkaSource.connect(KafkaSource.java:278)
at io.siddhi.extension.io.kafka.source.KafkaSource.connect(KafkaSource.java:56)
at io.siddhi.core.stream.input.source.Source.connectWithRetry(Source.java:160)
at io.siddhi.core.SiddhiAppRuntimeImpl.startSources(SiddhiAppRuntimeImpl.java:502)
at io.siddhi.core.SiddhiAppRuntimeImpl.start(SiddhiAppRuntimeImpl.java:427)
at io.siddhi.distribution.editor.core.internal.DebugRuntime.start(DebugRuntime.java:93)
at io.siddhi.distribution.editor.core.internal.DebugProcessorService.start(DebugProcessorService.java:42)
at io.siddhi.distribution.editor.core.internal.EditorMicroservice.start(EditorMicroservice.java:761)
at io.siddhi.distribution.editor.core.internal.EditorMicroservice.startWithVariables(EditorMicroservice.java:781)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.wso2.msf4j.internal.router.HttpMethodInfo.invokeResource(HttpMethodInfo.java:187)
at org.wso2.msf4j.internal.router.HttpMethodInfo.invoke(HttpMethodInfo.java:143)
at org.wso2.msf4j.internal.MSF4JHttpConnectorListener.dispatchMethod(MSF4JHttpConnectorListener.java:218)
at org.wso2.msf4j.internal.MSF4JHttpConnectorListener.lambda$onMessage$58(MSF4JHttpConnectorListener.java:129)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
`

This is my /lib folder status
kafka_2.13_3.4.0_1.0.0.jar siddhi-execution-json-2.0.5.jar siddhi-execution-regex-5.0.5.jar siddhi-execution-unique-5.0.5.jar siddhi-io-file-2.0.5.jar siddhi-io-kafka-5.0.7.jar siddhi-io-s3-1.0.2.jar siddhi-map-csv-2.0.3.jar siddhi-map-text-2.0.4.jar siddhi-store-rdbms-7.0.5.jar kafka_clients_3.4.0_1.0.0.jar siddhi-execution-list-1.0.0.jar siddhi-execution-reorder-5.0.3.jar siddhi-execution-unitconversion-2.0.2.jar siddhi-io-grpc-1.0.8.jar siddhi-io-nats-2.0.10.jar siddhi-io-tcp-3.0.4.jar siddhi-map-json-5.0.6.jar siddhi-map-xml-5.0.3.jar siddhi-store-redis-3.1.1.jar metrics_core_2.2.0_1.0.0.jar siddhi-execution-map-5.0.5.jar siddhi-execution-string-5.0.9.jar siddhi-io-cdc-2.0.5.jar siddhi-io-http-2.2.0.jar siddhi-io-prometheus-2.1.1.jar siddhi-map-avro-2.0.6.jar siddhi-map-keyvalue-2.0.5.jar siddhi-script-js-5.0.3.jar siddhi_io_kafka_5.0.17_1.0.0.jar scala_library_2.13.10_1.0.0.jar siddhi-execution-math-5.0.4.jar siddhi-execution-time-5.0.4.jar siddhi-io-email-2.0.5.jar siddhi-io-jms-2.0.3.jar siddhi-io-rabbitmq-3.0.4.jar siddhi-map-binary-2.0.4.jar siddhi-map-protobuf-1.0.4.jar siddhi-store-mongodb-2.1.0.jar zookeeper_3.6.3_1.0.0.jar

and my kafka version is 2.13
I'm using java11
What I can't understand is the above code works some time, but not often.
How can I deal with this isseu?

Affected Siddhi Version:

OS, DB, other environment details and versions:

Steps to reproduce:

Related Issues:

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions