代码之家  ›  专栏  ›  技术社区  ›  Mario Corral

如何使用cygnus将多个orion实体合并到一个ckan数据集?

  •  4
  • Mario Corral  · 技术社区  · 8 年前

    现在我正在尝试为我的orion数据创建一个包含所有实体信息的数据集。 我正确设置了所有数据,当一些数据更改时,或向cygnus发送通知,然后cygnus将数据添加到ckan。 如图所示。

    My arquitecture

    问题是cygnus为每个实体创建一个ckan资源。 我该怎么做才能让cygnus将所有数据放在一个资源中? 谢谢

    编辑1:

    我发现我可以通过在orion上只订阅一次来简化。

    {
        "entities": [
            {
                "type": "Room",
                "isPattern": "true",
                "id": "Room.*"
            }
        ] ...
    

    然后我注意到了分组功能( http://fiware-cygnus.readthedocs.io/en/develop/installation_and_administration_guide/grouping_rules_conf/index.html )但现在我迷路了,因为cygnus不想加载我的分组配置文件。它显示此错误:

    time=2016-05-03T05:32:29.658CDT | lvl=INFO | trans= | srv= | subsrv= | function=<init> | comp=Cygnus | msg=com.telefonica.iot.cygnus.interceptors.GroupingRules[58] : No grouping rules have been read
    Exception in thread "Thread-1" java.lang.NullPointerException
        at java.io.File.<init>(File.java:277)
        at com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$ConfigurationReader.run(GroupingInterceptor.java:244)
    

    在我的代理配置文件中,我有:

    cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf
    

    在grouping_rules.conf中:

    {
        "grouping_rules": [
            {
                "id": 1,
                "fields": [
                    "entityId"
                ],
                "regex": "room.*",
                "destination": "Rooms",
                "fiware_service_path": "/myhouse"
            }
        ]
    }
    

    编辑2:

    我有文档所述的所有行:

        cygnusagent.sources = http-source
        cygnusagent.sinks = ckan-sink
        cygnusagent.channels = ckan-channel
    
        cygnusagent.sources.http-source.channels = ckan-channel
        cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
        cygnusagent.sources.http-source.port = 5050
        cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler
        cygnusagent.sources.http-source.handler.notification_target = /notify
        cygnusagent.sources.http-source.handler.default_service = test
        cygnusagent.sources.http-source.handler.default_service_path = /myhouse
        cygnusagent.sources.http-source.handler.events_ttl = 5
        cygnusagent.sources.http-source.interceptors = ts gi
        cygnusagent.sources.http-source.interceptors.ts.type = timestamp
        cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
        cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf
    cygnusagent.channels.ckan-channel.type = memory
    cygnusagent.channels.ckan-channel.capacity = 1000
    cygnusagent.channels.ckan-channel.transactionCapacity = 100
    
    # ============================================
    # OrionCKANSink configuration
    # channel name from where to read notification events
    cygnusagent.sinks.ckan-sink.channel = ckan-channel
    
    # sink class, must not be changed
    cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink
    
    # true if the grouping feature is enabled for this sink, false otherwise
    cygnusagent.sinks.ckan-sink.enable_grouping = true
    
    # true if lower case is wanted to forced in all the element names, false otherwise
    cygnusagent.sinks.hdfs-sink.enable_lowercase = false
    
    # the CKAN API key to use
    cygnusagent.sinks.ckan-sink.api_key = 436fffc8-b397-478a-92fd-bbc5ffaf8269
    
    # the FQDN/IP address for the CKAN API endpoint
    cygnusagent.sinks.ckan-sink.ckan_host = ckan-demo.ckan.io
    
    # the port for the CKAN API endpoint
    cygnusagent.sinks.ckan-sink.ckan_port = 80
    
    # Orion URL used to compose the resource URL with the convenience operation URL to query it
    cygnusagent.sinks.ckan-sink.orion_url = http://localhost:1026
    
    # how the attributes are stored, either per row either per column (row, column)
    cygnusagent.sinks.ckan-sink.attr_persistence = column
    
    # enable SSL for secure Http transportation; 'true' or 'false'
    cygnusagent.sinks.ckan-sink.ssl = false
    
    # number of notifications to be included within a processing batch
    cygnusagent.sinks.ckan-sink.batch_size = 100
    
    # timeout for batch accumulation
    cygnusagent.sinks.ckan-sink.batch_timeout = 60
    
    # number of retries upon persistence error
    cygnusagent.sinks.ckan-sink.batch_ttl = 10
    

    文件权限对我来说似乎没问题:

    [root@VM013cen-Prod conf]# ls *.conf -l
    -rwxrwxrwx 1 cygnus cygnus 2675 may  3 06:45 agent_test.conf
    -rwxrwxrwx 1 cygnus cygnus  258 may  3 05:08 grouping_rules.conf
    -rwxr-xr-x 1 cygnus cygnus  135 mar  1 02:50 krb5_login.conf
    

    测试通过:

    Results :
    
    Tests run: 80, Failures: 0, Errors: 0, Skipped: 0
    
        [INFO] ------------------------------------------------------------------------
        [INFO] BUILD SUCCESS
        [INFO] ------------------------------------------------------------------------
        [INFO] Total time: 1:05.862s
        [INFO] Finished at: Tue May 03 06:53:44 CDT 2016
        [INFO] Final Memory: 41M/105M
        [INFO] ------------------------------------------------------------------------
    

    我使用的启动命令: /usr/cygnus/bin/cygnus-flume-ng agent --conf /usr/cygnus/conf/ -f /usr/cygnus/conf/agent_test.conf -n cygnusagent -Dflume.root.logger=INFO,console

    1 回复  |  直到 8 年前
        1
  •  2
  •   frb    8 年前

    是的,您是对的,分组规则是为实现您所描述的用例而设计的。因此,通过定义一个简单的规则,用一个正则表达式匹配所有实体的类型(假设这种类型总是相同的),就可以做到这一点;或者使用与实体ID的公共部分匹配的正则表达式(如您所建议的)。

    然而,你的问题似乎是分组规则本身。您说您的配置包含:

    cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf
    

    你能在那句话之前确认一下你有这样的东西吗?

    cygnus-ngsi.sources.http-source.interceptors = ts gi
    cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
    cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
    

    此外,您可以检查 /usr/cygnus/conf/grouping_rules.conf ? 也许运行Cygnus的用户无法读取它。

    关于规则,新的FIWARE服务路径应该以 / ,因为所有FIWARE服务路径都必须以斜杠开头(我认为这在上一版本中没有正确记录)。

    哈!

    编辑1 :

    我想我已经找到问题了。配置的参数名称为“…gro p uing…”,而正确的是“…grouping…”:)