代码之家  ›  专栏  ›  技术社区  ›  Nagesh Singh Chauhan

无法在beamsql中调用udf

  •  0
  • Nagesh Singh Chauhan  · 技术社区  · 6 年前

    我有一个下面的beamsql查询:

    PCollectionTuple query0 = PCollectionTuple.of(
                new TupleTag<BeamRecord>("temp2"), temp2).and(new TupleTag<BeamRecord>("temp3"), temp3)"));     
                PCollection<BeamRecord> rec_3 = query0.apply(
                BeamSql.queryMulti("SELECT a.*, \r\n" +
                        "(case \r\n" +
                        "when a.grp > 5 then 1 \r\n" +
                        "when b.grp > 5 then 1 \r\n" +
                        "else 0 end) as flag \r\n" +
                        "from temp2 a left join \r\n" +
                        "temp3 b on a.eventid = b.eventid and b.Weekint = c1(a.Weekint)").withUdf("c1", AddS.class));
    

    在上面的查询中,我在表temp2和temp3之间执行左连接,在on条件下,我调用名为“adds”的udf。 在这个自定义项中 添加 ,weekint被视为bigint。udf将weekint作为输入并将其转换为日期格式,然后向其添加7并将值返回为bigint。以下是自定义项:

    public static class AddS implements BeamSqlUdf {                
         private static final long serialVersionUID = 1L;
         public static BigInteger eval(BigInteger input) throws ParseException{
             SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
             String strdate = input.toString();
                Date date1 = dateFormat.parse(strdate);
                 Calendar c = Calendar.getInstance();  
                 c.setTime(date1);
                 c.add(Calendar.DATE, 7);
                 String f =c.getTime().toString();
                 BigInteger x = new BigInteger(f);   
                    return (x);
                  }
                } 
    

    我的错误如下:

    Exception in thread "main" java.lang.AssertionError: No assign rules for OTHER defined
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.type.SqlTypeAssignmentRules.canCastFrom(SqlTypeAssignmentRules.java:326)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.type.SqlTypeUtil.canCastFrom(SqlTypeUtil.java:863)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil$4.test(SqlUtil.java:567)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil$4.test(SqlUtil.java:527)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.runtime.PredicateImpl.apply(PredicateImpl.java:36)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.Iterators$6.computeNext(Iterators.java:617)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.Iterators.addAll(Iterators.java:366)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.Lists.newArrayList(Lists.java:163)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:438)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:371)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:245)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:223)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5053)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5040)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlCall.accept(SqlCall.java:137)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1588)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1573)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:225)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4764)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlCall.validate(SqlCall.java:114)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:224)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4764)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlCall.validate(SqlCall.java:114)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:3636)
    at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:2988)
    at 
    

    我无法找出导致此错误的原因,可能是UDF创建不正确或调用不正确? 或者如果有人能解释这个错误的原因。

    1 回复  |  直到 6 年前
        1
  •  2
  •   Andrew Pilloud    6 年前

    您的自定义项创建不正确。beam sql在内部不支持javabiginger类型。如果sql数据类型是bigint,那么应该改用java long类型。

    (我有 opened an issue 使这个错误更容易理解。)