Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error on kafka 3.9.0 cluster #695

Open
andywang425 opened this issue Feb 7, 2025 · 0 comments
Open

Error on kafka 3.9.0 cluster #695

andywang425 opened this issue Feb 7, 2025 · 0 comments

Comments

@andywang425
Copy link

Describe the bug
Hi, i was following the Getting Started tutorial to learn to use kafka-connect-file-pulse v2.14.1 on my selfhosted kafka 3.9.0 cluster. I've check that Connect File Pulse plugin is correctly loaded. At Parsing a CSV file section, first i started a new connector instance with the following config:

{
  "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
  "filters": "ParseCSVLine",
  "filters.ParseCSVLine.extract.column.name": "headers",
  "filters.ParseCSVLine.trim.column": "true",
  "filters.ParseCSVLine.separator": ",",
  "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
  "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
  "fs.cleanup.policy.triggered.on":"COMMITTED",
  "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
  "fs.listing.directory.path":"/root/playground/output",
  "fs.listing.filters":"io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
  "fs.listing.interval.ms": "10000",
  "file.filter.regex.pattern":".*\\.csv$",
  "offset.policy.class":"io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy",
  "offset.attributes.string": "name",
  "skip.headers": "1",
  "topic": "connect-file-pulse-quickstart-csv",
  "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
  "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
  "tasks.file.status.storage.bootstrap.servers": "slave1:9092,slave2:9092,slave3:9092",
  "tasks.file.status.storage.topic": "connect-file-pulse-status",
  "tasks.file.status.storage.topic.partitions": 3,
  "tasks.file.status.storage.topic.replication.factor": 1,
  "tasks.max": 3
}

Then when i checked the connector status, i got an error. The error appeared in kafka's log output too:

[2025-02-08 00:25:23,522] ERROR [connect-file-pulse-quickstart-csv|task-0] WorkerSourceTask{id=connect-file-pulse-quickstart-csv-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:234)
java.lang.IllegalAccessError: failed to access class org.apache.kafka.common.utils.SystemTime from class io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset (org.apache.kafka.common.utils.SystemTime is in unnamed module of loader 'app'; io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @6156496)
	at io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset.empty(FileObjectOffset.java:26)
	at io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext.<init>(FileObjectContext.java:37)
	at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.addAll(DefaultFileRecordsPollingConsumer.java:90)
	at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:184)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:466)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:79)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Any help would be appreciated.

To Reproduce
Just start a new connector instance.

Expected behavior
The task runs successfully.

Additional context
Version: kafka-connect-file-pulse v2.14.1, kafka 3.9.0 with kraft (3 workers), ubuntu 24.04

I cloned the repo, and changed org.apache.kafka.version in pom.xml to 3.9.0. Then i opened src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectOffset.java, i saw the following error:

Image

Maybe that's the cause of the problem? Is there anything I can do besides downgrading the version of kafka?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant