代码之家  ›  专栏  ›  技术社区  ›  jovicbg

如何在NIFI中将参数传递给python脚本

  •  1
  • jovicbg  · 技术社区  · 6 年前

    也许这是个愚蠢的问题,但我不得不问。

    我在NIFI中有一个collect_数据处理器,它将消息流到另一个进程中,该进程使用Python脚本来解析消息并创建JSON文件。问题是我不知道Python脚本中函数的输入是什么。如何将这些消息(16位数字)从collect_数据处理器传递到包含python脚本的下一个处理器。这方面有什么好的、基本的例子吗?

    我已经在网上找了一些例子,但还没有真正找到。

    import datetime
    import hashlib
    from urlparse import urlparse, parse_qs
    import sys
    from urlparse import urlparse, parse_qs
    from datetime import *
    import json
    import java.io
    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    from time import time
    
    
    def parse_zap(inputStream, outputStream):
        data = inputStream
        buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
        buf = int(buf, 16)
        buf_check = str(buf)
        if buf_check[17] == 2:
            pass
        datetime_now = datetime.now()
        log_date = datetime_now.isoformat()
        try:
            mac = buf_check[7:14].upper()
            ams_id = buf_check[8:]
            action = buf_check[3:4]
            time_a = int(time())
            dict_test = {
            "user": {
                "guruq" : 'false'
            },
            "device" : {
                "type" : "siolbox",
                "mac": mac
            },
            "event" : {
                "origin" : "iptv",
                "timestamp": time_a,
                "type": "zap",
                "product-type" : "tv-channel",
                "channel": {
                    "id" : 'channel_id',
                    "ams-id": ams_id
                },
                "content": {
                    "action": action
                }
            }
            }
            return dict_test
        except Exception as e:
            print('%s nod PARSE 500 \"%s\"' % (log_date, e))
    

    感谢我正确阅读,但现在我无法创建输出。 事先谢谢。

    2 回复  |  直到 6 年前
        1
  •  3
  •   Óscar Andreu    6 年前

    看看这个脚本:

    import json
    import java.io
    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    
    class PyStreamCallback(StreamCallback):
      def __init__(self):
            pass
      def process(self, inputStream, outputStream):
        text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
        for line in text[1:]:
            outputStream.write(line + "\n") 
    
    flowFile = session.get()
    if (flowFile != None):
      flowFile = session.write(flowFile,PyStreamCallback())
      flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
      session.transfer(flowFile, REL_SUCCESS)
    

    它从一个属性中获取要从流文件中删除的行数,然后获取流文件并在没有这些行的情况下再次写入它,这是一个简单且很好的例子,说明了如何使用属性以及如何使用流文件。

    根据更新后的代码,您的代码必须如下所示:

    import datetime
    import hashlib
    from urlparse import urlparse, parse_qs
    import sys
    from urlparse import urlparse, parse_qs
    from datetime import *
    import json
    import java.io
    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    from time import time
    
    
    class PyStreamCallback(StreamCallback):
      def __init__(self):
            pass
      def process(self, inputStream, outputStream):
        data = inputStream
        buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
        buf = int(buf, 16)
        buf_check = str(buf)
        if buf_check[17] == 2:
            pass
        datetime_now = datetime.now()
        log_date = datetime_now.isoformat()
        try:
            mac = buf_check[7:14].upper()
            ams_id = buf_check[8:]
            action = buf_check[3:4]
            time_a = int(time())
            dict_test = {
            "user": {
                "guruq" : 'false'
            },
            "device" : {
                "type" : "siolbox",
                "mac": mac
            },
            "event" : {
                "origin" : "iptv",
                "timestamp": time_a,
                "type": "zap",
                "product-type" : "tv-channel",
                "channel": {
                    "id" : 'channel_id',
                    "ams-id": ams_id
                },
                "content": {
                    "action": action
                }
            }
            }
            return dict_test
        except Exception as e:
            print('%s nod PARSE 500 \"%s\"' % (log_date, e))
    
    flowFile = session.get()
    if (flowFile != None):
      flowFile = session.write(flowFile,PyStreamCallback())
      flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
      session.transfer(flowFile, REL_SUCCESS)        
    
        2
  •  2
  •   Andy    6 年前

    我想我理解你的问题,但对你的流程有点模棱两可。我在回答几个不同的可能情况。

    1. 您有一个从源(即 FetchFTP )并与 ExecuteScript 包含用于转换这些值的python脚本的处理器。在这种情况下,python脚本可以直接使用标准API对流文件属性和内容进行操作。见 Matt Burgess' blog 用于编写自定义脚本以对数据进行操作的许多示例。
    2. 您有一个从源获取数据并连接到 ExecuteStreamCommand 处理器,它使用如下命令调用外部python脚本 python my_external_script.py arg1 arg2 ... . 在这种情况下,流文件内容被传递到 STDIN 执行团队命令 处理器,所以脚本应该以这种方式使用它。 This answer explains 有关使用的详细信息 执行团队命令 使用python脚本。
    3. 您有一个自定义处理器,它在内部调用单独的Python进程。这是一个坏主意,应该重构为其他模型之一。这打破了关注点的分离,失去了处理器生命周期的帮助,模糊了线程处理和时间安排,缺乏出处可见性,与NIFI的开发模型背道而驰。

    如果您的python脚本非常简单,您可以将它放在 ScriptedRecordWriter 并使用它同时处理多个“记录”以获得性能优势。这对于您的用例来说可能是高级的,这取决于您的流和传入数据的外观。

    更新日期:2018-10-03 10:50

    尝试在 执行脚本 身体:

    import json
    import java.io
    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    
    class PyStreamCallback(StreamCallback):
        def __init__(self):
            pass
        def process(self, inputStream, outputStream):
            text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            result = parse_zap(text)
    
            outputStream.write(bytearray(result.encode('utf-8')))
    
    flowFile = session.get()
    if (flowFile != None):
        flowFile = session.write(flowFile,PyStreamCallback())
        flowFile = session.putAttribute(flowFile, "parsed_zap", "true")
        session.transfer(flowFile, REL_SUCCESS)
    
    // Your parse_zap() method here, with the signature changed to just accept a single string
    ...