代码之家  ›  专栏  ›  技术社区  ›  Josh Lindsey

编写自定义水槽装饰器,但出现错误。我错过了什么?

  •  4
  • Josh Lindsey  · 技术社区  · 14 年前

    我正在为Cloudera的分布式日志聚合系统Flume编写一个定制的decorator插件。我的Java代码如下:

    package multiplex;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import com.cloudera.flume.conf.Context;
    import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
    import com.cloudera.flume.core.Event;
    import com.cloudera.flume.core.EventImpl;
    import com.cloudera.flume.core.EventSink;
    import com.cloudera.flume.core.EventSinkDecorator;
    import com.cloudera.util.Pair;
    import com.google.common.base.Preconditions;
    
    public class JsonMultiplexDecorator<S extends EventSink> extends EventSinkDecorator<S> {
      private final String serverName;
      private final String logType;
    
      public JsonMultiplexDecorator(S s, String serverName, String logType) {
        super(s);
    
        this.serverName = serverName;
        this.logType = logType;
      }
    
      @Override
      public void append(Event e) throws IOException {
        String body = new String(e.getBody()).replaceAll("\"", "\\\"");
    
        String json = "{ \"server\": \"" + this.serverName + "\"," +
          "\"log_type\": \"" + this.logType + "\", " +
          "\"body\": \"" + body + "\" }";
    
        EventImpl e2 = new EventImpl(json.getBytes(),
            e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(),
            e.getAttrs());
    
        super.append(e2);
      }
    
      public static SinkDecoBuilder builder() {
        return new SinkDecoBuilder() {
          @Override
          public EventSinkDecorator<EventSink> build(Context context,
              String... argv) {
            Preconditions.checkArgument(argv.length == 2,
                "usage: multiplexDecorator(serverName, logType)");
    
            return new JsonMultiplexDecorator<EventSink>(null, argv[0], argv[1]);
          }
        };
      }
    
      public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
        List<Pair<String, SinkDecoBuilder>> builders = 
          new ArrayList<Pair<String, SinkDecoBuilder>>();
    
        builders.add(new Pair<String, SinkDecoBuilder>("jsonMultiplexDecorator", builder()));
    
        return builders;
      }
    }
    

    这可以很好地用ant编译成JAR文件,我可以在运行时将其加载到Flume中,并成功地配置节点使用它。但是,当一个事件在加载了此插件的节点上实际发生时,我的日志中会出现如下错误:

    2010-10-19 21:03:15,176 [logicalNode xxxxx] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because null
    java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableMap.put(Collections.java:1285)
        at com.cloudera.flume.core.EventBaseImpl.set(EventBaseImpl.java:65)
        at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:164)
        at com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.append(DiskFailoverDeco.java:93)
        at com.cloudera.flume.core.BackOffFailOverSink.append(BackOffFailOverSink.java:144)
        at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:109)
        at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
        at multiplex.JsonMultiplexDecorator.append(JsonMultiplexDecorator.java:56)
        at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
        at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:69)
        at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)
    

    (在 [logicalNode xxxxx]

    1 回复  |  直到 14 年前
        1
  •  2
  •   phooji    13 年前

    当你建造 EventImpl e2 ,你通过了 e.getAttrs() e、 获取属性() new HashMap(e.getAttrs()) 应该足够了。

    参考: https://groups.google.com/a/cloudera.org/group/flume-user/browse_thread/thread/046b4a446877c8f9?pli=1