代码之家  ›  专栏  ›  技术社区  ›  mukesh210

卡夫卡流:Lib Rocks数据库上的UnsatifiedLinkError

  •  1
  • mukesh210  · 技术社区  · 6 年前

    我正在尝试卡夫卡流的字数问题。我正在使用Kafka 1.1.0和scala 2.11.12版以及sbt 1.1.4版。我收到以下错误:

    Exception in thread "wordcount-application-d81ee069-9307-46f1-8e71-c9f777d2db64-StreamThread-1" java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni5439068356048679315.dll: À¦¥Y
    at java.lang.ClassLoader$NativeLibrary.load(Native Method)
    at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
    at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
    at java.lang.Runtime.load0(Runtime.java:809)
    at java.lang.System.load(System.java:1086)
    at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
    at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
    at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
    at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
    at org.rocksdb.Options.<clinit>(Options.java:25)
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:116)
    at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:167)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.init(InnerMeteredKeyValueStore.java:160)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.init(MeteredKeyValueBytesStore.java:102)
    at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:225)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:162)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88)
    at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:316)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
    

    我已经试过这里给出的解决方案了 UnsatisfiedLinkError on Lib rocks DB dll when developing with Kafka Streams

    下面是我在scala中试用的代码。

    object WordCountApplication {
    
      def main(args: Array[String]) {
        val config: Properties = {
          val p = new Properties()
          p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
          p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
          p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
          p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
          p
        }
    
        val builder: StreamsBuilder = new StreamsBuilder()
        val textLines: KStream[String, String] = builder.stream("streams-plaintext-input")
    
        val afterFlatMap: KStream[String, String] = textLines.flatMapValues(new ValueMapper[String,java.lang.Iterable[String]] {
          override def apply(value: String): lang.Iterable[String] = value.split("\\W+").toIterable.asJava
        })
    
        val afterGroupBy: KGroupedStream[String, String] = afterFlatMap.groupBy(new KeyValueMapper[String,String,String] {
          override def apply(key: String, value: String): String = value
        })
    
    
        val wordCounts: KTable[String, Long] = afterGroupBy
          .count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
        wordCounts.toStream().to("streams-wordcount-output ", Produced.`with`(Serdes.String(), Serdes.Long()))
    
        val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
        streams.start()
    
        Runtime.getRuntime.addShutdownHook(new Thread(
          new Runnable{
            override def run() = streams.close(10, TimeUnit.SECONDS)}
        ))
      }
    }
    

    建筑sbt公司

    name := "KafkaStreamDemo"
    
    version := "0.1"
    
    scalaVersion := "2.11.12"
    
    libraryDependencies ++= Seq(
      "org.apache.kafka" %% "kafka" % "1.1.0",
      "org.apache.kafka" % "kafka-clients" % "1.1.0",
      "org.apache.kafka" % "kafka-streams" % "1.1.0",
      "ch.qos.logback" % "logback-classic" % "1.2.3"
    )
    

    如果有人遇到过这样的问题,请帮忙。

    2 回复  |  直到 6 年前
        1
  •  2
  •   mukesh210    6 年前

    最后,我找到了有效的答案。我跟着 Unable to load rocksdbjni

    我做了两件对我有用的事。

    1) 我已安装 Visual C++ Redistributable for Visual Studio 2015

    2) 之前,我将rocksdb 5.7.3与kafka streams 1.1.0一起使用(rocksdb 5.7.3默认与kafka streams 1.1.0一起提供)。我将rocksdb依赖项从kafka streams依赖项中排除,并安装了rocksdb 5.3.6。作为参考,下面是我的构建。sbt现在。

    name := "KafkaStreamDemo"
    
    version := "0.1"
    
    scalaVersion := "2.12.5"
    
    libraryDependencies ++= Seq(
    
      "org.apache.kafka" %% "kafka" % "1.1.0",
      "org.apache.kafka" % "kafka-clients" % "1.1.0",
      "org.apache.kafka" % "kafka-streams" % "1.1.0" exclude("org.rocksdb","rocksdbjni"),
      "ch.qos.logback" % "logback-classic" % "1.2.3",
      "org.rocksdb" % "rocksdbjni" % "5.3.6"
    
    )
    

    希望它能帮助别人。

    谢谢

        2
  •  2
  •   Dilip Murupala    6 年前

    在我的例子中,我使用的是卡夫卡流:1.0.2。将基本docker映像从alpine-jdk8:latest更改为openjdk:8-jre有效。

    此链接- https://github.com/docker-flink/docker-flink/pull/22 帮助我找到了这个解决方案。