代码之家  ›  专栏  ›  技术社区  ›  kovac

如何使用librdkafka发送json数据?

  •  0
  • kovac  · 技术社区  · 3 年前

    我正在尝试使用librdkafka c api发送json有效负载。我现在想做的是

    #include <jansson.h>
    #include <librdkafka/rdkafka.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <time.h>
    
    typedef struct my_data {
        char *id;
        char *value;
        unsigned long timestamp;
    } my_data;
    
    char * my_data_to_json(const my_data *ev);
    void rk_dr_callback(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque);
    
    int main(int argc, char * argv[])
    {
        // configure producer
        rd_kafka_conf_t *conf = rd_kafka_conf_new();
        char errstr[512];
    
        rd_kafka_conf_set(conf, "bootstrap.servers", "k1.example.com:9093", errstr, sizeof(errstr));    
        
        rd_kafka_conf_set_dr_msg_cb(conf, rk_dr_callback);
    
        rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));    
    
        if (!rk) {
            fprintf(stderr, "failed to create kafka producer: %s\n", errstr);
            return -1;
        }
    
        // create json
        my_data ev = {
            .id = "test-id",
            .value = "test-value",
            .timestamp = (unsigned long) time(NULL)
        };
    
        char *json = my_data_to_json(&ev);
        printf("json dump: %s\n", json);
    
        // publish data
        rd_kafka_resp_err_t err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("topic"),
            RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(json, strlen(json)),
            RD_KAFKA_V_KEY("key-1", strlen("key-1")), RD_KAFKA_V_OPAQUE(NULL),
            RD_KAFKA_V_END);
    
        // cleanup
        rd_kafka_flush(rk, 5 * 1000);
        
        if (rd_kafka_outq_len(rk) > 0) {
            fprintf(stderr, "%d message(s) were not delivered\n", rd_kafka_outq_len(rk));
        }
    
        rd_kafka_destroy(rk);
        free(json);
    
        return 0;
    }
    
    void rk_dr_callback(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
    {
        if (msg->err) {
            printf("failed to send message: %s\n", rd_kafka_err2str(msg->err));
        }
        else {
            printf("delivered %zd bytes to partition %d\n", msg->len, msg->partition);
        }
    }
    
    char * my_data_to_json(const my_data *ev)
    {
        /* build the JSON object {"id": "id", "value": "value", "timestamp": 12345678} */
        json_t *json = json_pack("{sssssi}", "id", ev->id, "value", ev->value, "timestamp", ev->timestamp);
    
        if (!json) {
            fprintf(stderr, "failed to construct json from data\n");
        }
    
        char *str = json_dumps(json, JSON_COMPACT);
    
        if (!str) {
            fprintf(stderr, "failed to encode json object\n");
        }
    
        return str;
    }
    

    使用上面的代码,我设法将字节发送到代理。但是json有效载荷似乎格式不正确。消费者(消费者是一个使用Newtonsoft json库进行反序列化的C#客户端)抛出以下错误:

    Newtonsoft.Json.JsonReaderException: Unexpected character encountered while parsing value: . Path '', line 0, position 0.\n at Newtonsoft.Json.JsonTextReader.ParseValue()\n at ...
    

    我不太清楚我的错误是在构造json对象、将其编码为字符串的方式,还是在使用librdkafka发布json字符串的方式上。

    0 回复  |  直到 3 年前
    推荐文章