代码之家  ›  专栏  ›  技术社区  ›  Traiano Welcome

[Golang][kafka]“exec.Command()”在运行kcat/kafkaat时不产生输出,或任何错误状态

  •  0
  • Traiano Welcome  · 技术社区  · 3 年前

    我需要包起来 kcat 在Go函数中读取一系列主题消息,因此考虑使用 exec.Command() 原因如下:

    package main
    
    import (
        "fmt"
        "os/exec"
    )
    
    func main() {
    
        cmd := exec.Command("kcat", "-b kafka.kafka.svc.cluster.local:9092", "-t messages", "-o 11000", "-c 11333")
    
        fmt.Println("Command String:", cmd.String())
    
        out, err := cmd.CombinedOutput()
    
        if err != nil {
            fmt.Println("Error Accessing kafka topic messages ", err.Error(), string(out))
            return
        }
    
        fmt.Println("Result Length:", len(out))
    
        fmt.Println("Result Content:", string(out))
    
    }
    
    

    但是,这只返回的第一行 kcat 输出:

    /app/tools # ./five
    Command String: /usr/bin/kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 11000 -c 11333
    Result Length: 58
    
    Result Content: % Auto-selecting Producer mode (use -P or -C to override)
    
    

    (注意:我在docker容器中运行这个,但我认为它没有什么区别)

    但是,当直接从CLI运行时,此操作效果良好:

    /app/tools # 
    /app/tools # kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 10 -c 15
    % Auto-selecting Consumer mode (use -P or -C to override)
    %4|1640957136.462|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [1]: offset reset (at offset 10) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
    %4|1640957136.483|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [2]: offset reset (at offset 10) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
    [{"Name":"newOrder", "ID":"9266","Time":"9266","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"1547","Time":"1547","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"9179","Time":"9179","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"8740","Time":"8740","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"9318","Time":"9318","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"1743","Time":"1743","Data":"new order", "Eventname":"newOrder"}]
    
    

    似乎有一些独特之处 kcat 中断的命令 执行官。命令() 在Go中。

    问题:

    • 我在围棋中还有其他方法可以达到同样的效果吗?
    • 这可能是我使用方式的问题吗 执行官。命令()

    理想情况下,我可以使用 kcat 命令,因为我想避免使用segmentios kafka-go 库。

    [编辑]

    • 分开论点(如@onecricketer所建议的):
    cmd := exec.Command("kcat", "-b", "kafka.kafka.svc.cluster.local:9092", "-t", "messages", "-o", "11000", "-c", "11333")
    

    结果(相同错误):

    /app/tools # ./code
    Command String: /usr/bin/kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 11000 -c 11333
    Result Length: 58
    Result Content: % Auto-selecting Producer mode (use -P or -C to override)
    
    • 使用BASH作为外壳(由maxm建议):

    相同的结果,即仅报告kcat输出的第一行:

    /app/tools # ./code
    Command String: /bin/bash -c kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 11000 -c 11333
    Result Length: 58
    Result Content: % Auto-selecting Producer mode (use -P or -C to override)
    

    [编辑]

    注意:然而,当我使用Python的shell执行机制时,它工作得很好,这让我怀疑Gos的shell处理功能是否有缺陷:

    import subprocess
    
    process = subprocess.Popen(["kcat","-b","kafka.kafka.svc.cluster.local:9092","-t","messages","-o","1", "-c", "11"], 
                               stdout=subprocess.PIPE,
                               universal_newlines=True)
    
    while True:
        output = process.stdout.readline()
        print(output.strip())
        # Do something else
        return_code = process.poll()
        if return_code is not None:
            print('RETURN CODE', return_code)
            # Process has finished, read rest of the output 
            for output in process.stdout.readlines():
                print(output.strip())
            break
    
    

    结果:

    /app/tools/python # python3 code.py 
    % Auto-selecting Consumer mode (use -P or -C to override)
    %4|1641004616.232|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [2]: offset reset (at offset 1) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
    %4|1641004616.236|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [1]: offset reset (at offset 1) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
    [{"Name":"newOrder", "ID":"4512","Time":"4512","Data":"new order", "Eventname":"newOrder"}]
    
    RETURN CODE 0
    [{"Name":"newOrder", "ID":"2388","Time":"2388","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"8707","Time":"8707","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"1643","Time":"1643","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"2421","Time":"2421","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"7520","Time":"7520","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"1258","Time":"1258","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"1457","Time":"1457","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"2907","Time":"2907","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"9266","Time":"9266","Data":"new order", "Eventname":"newOrder"}]
    
    [{"Name":"newOrder", "ID":"1547","Time":"1547","Data":"new order", "Eventname":"newOrder"}]
    
    
    
    
    0 回复  |  直到 3 年前
        1
  •  1
  •   OneCricketeer Gabriele Mariotti    3 年前

    正如输出所说,生产者模式是自动选择的

    尝试使用带有分隔参数的使用者模式

    cmd := exec.Command("kcat", "-C", "-b", "kafka.kafka.svc.cluster.local:9092", "-t", "messages", "-o", "11000", "-c", "11333")
    
        2
  •  1
  •   BMitch    3 年前

    go命令:

    cmd := exec.Command("kcat", "-b kafka.kafka.svc.cluster.local:9092", "-t messages", "-o 11000", "-c 11333")
    

    与shell命令相同:

    kcat "-b kafka.kafka.svc.cluster.local:9092" "-t messages" "-o 11000" "-c 11333"
    

    您需要将参数分开,与默认情况下shell在每个空间上为您所做的操作相同:

    cmd := exec.Command("kcat", "-b", "kafka.kafka.svc.cluster.local:9092", "-t", "messages", "-o", "11000", "-c", "11333")