今天反馈Flume在读取Kafka数据写入对象存储的出现一个问题:

UnsupportedOperationException “XX streams are not Syncable. See HADOOP-17597.”

这里提示的是HADOOP-17597的issue,进入这个issue去查看详情:https://issues.apache.org/jira/browse/HADOOP-17597

看到里面的意思是写入对象存储的时候,如果流没有close的时候,是不会有数据产生的,只有close的时候才会把数据写进去,而如果报错了则会导致数据全部写不进去。

可以通过降级设置,新增hdfs参数:

fs.xx.downgrade.syncable.exceptions=true

用来控制异常,忽略这个异常。

设置了这个参数后,虽然可以启动,但是有2个问题:
数据稳定的交叉丢失,例如收到test1,test2,test3,test4,tes2和test4稳定交叉丢失,同时log会有错误:

2022-09-08 15:59:07,297 ERROR flume.SinkRunner (SinkRunner.java:run(158)) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: org.apache.flume.FlumeException: Error while trying to hflushOrSync!
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flume.FlumeException: Error while trying to hflushOrSync!
at org.apache.flume.sink.hdfs.AbstractHDFSWriter.hflushOrSync(AbstractHDFSWriter.java:274)
at org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:126)
at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:520)
at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:517)
at org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:727)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:724)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flume.sink.hdfs.AbstractHDFSWriter.hflushOrSync(AbstractHDFSWriter.java:266)
... 10 more

了解了一下集群的部署模式,发现是因为每一个节点都安装了Flume Agent,这个才是真正的问题所在,一旦开启多个Flume Agent使用相同的配置的时候,会出现同名的文件被不同的flume agent打开,在文件第二次打开后,先前打开的agent拥有的token就失效了,因此无法关闭它,就会不断的报错:Error while trying to hflushOrSync!

test1 test2 test3 test4 稳定丢失的是2和4,因为test被agent 1获取,写入,test2被agent2获取,但是agent 1和2同时写入的是同一个文件。所以agent 1写入成功,但是关闭失败,抛出前面的错,agent 2拿一个已经被打开的io,打开失败,所以test2丢失。于是tes2 test4这种稳定交叉丢失。


扫码手机观看或分享: