代码之家  ›  专栏  ›  技术社区  ›  user60856839

使用Sparkyr完成时间序列

  •  2
  • user60856839  · 技术社区  · 6 年前

    我正在尝试在时间序列数据集中查找丢失的分钟数。我在一个小样本上为本地性能编写了一个R代码:

    test <- dfv %>% mutate(timestamp = as.POSIXct(DaySecFrom.UTC.)) %>% 
    complete(timestamp = seq.POSIXt(min(timestamp), max(timestamp), by = 'min'), ElemUID)
    

    但你不能使用 complete() 从…起 三年 spark\u tbl

    Error in UseMethod("complete_") : 
      no applicable method for 'complete_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
    

    以下是一些测试数据:

    ElemUID ElemName    Kind    Number  DaySecFrom(UTC) DaySecTo(UTC)
    399126817   A648/13FKO-66   DEZ     2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
    483492732   A661/18FRS-97   DEZ   120.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
    399126819   A648/12FKO-2    DEZ    60.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
    399126818   A648/12FKO-1    DEZ   180.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
    399126816   A648/13FKO-65   DEZ     2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
    398331142   A661/31OFN-1    DEZ   120.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
    398331143   A661/31OFN-2    DEZ     2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
    483492739   A5/28FKN-65 DEZ     2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
    483492735   A661/23FRS-97   DEZ    60.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
    

    在R的spark cluster上,是否有其他方法或解决方法来解决此任务? 我真的很高兴你的帮助!

    1 回复  |  直到 6 年前
        1
  •  2
  •   zero323 little_kid_pea    6 年前

    查找最小值和最大值作为历元时间:

    df <- copy_to(sc, tibble(id=1:4, timestamp=c(
        "2017-07-01 23:49:00.000", "2017-07-01 23:50:00.000",
        # 6 minutes gap
        "2017-07-01 23:56:00.000",
        # 1 minute gap
        "2017-07-01 23:58:00.000")
    ), "df", overwrite=TRUE)
    
    min_max <- df %>% 
      summarise(min(unix_timestamp(timestamp)), max(unix_timestamp(timestamp))) %>% 
      collect() %>% 
      unlist()
    

    从生成参考范围 min(epoch_time) max(epoch_time) + interval :

    library(glue) 
    
    query <- glue("SELECT id AS timestamp FROM RANGE({min_max[1]}, {min_max[2] + 60}, 60)") %>%
      as.character()
    
    ref <- spark_session(sc) %>% invoke("sql", query) %>% 
      sdf_register() %>%
      mutate(timestamp = from_unixtime(timestamp, "yyyy-MM-dd HH:mm:ss.SSS"))
    

    外部连接两个:

    ref %>% left_join(df, by="timestamp")
    
    # Source:   lazy query [?? x 2]
    # Database: spark_connection
       timesptamp                 id
       <chr>                   <int>
     1 2017-07-01 23:49:00.000     1
     2 2017-07-01 23:50:00.000     2
     3 2017-07-01 23:51:00.000    NA
     4 2017-07-01 23:52:00.000    NA
     5 2017-07-01 23:53:00.000    NA
     6 2017-07-01 23:54:00.000    NA
     7 2017-07-01 23:55:00.000    NA
     8 2017-07-01 23:56:00.000     3
     9 2017-07-01 23:57:00.000    NA
    10 2017-07-01 23:58:00.000     4
    # ... with more rows
    

    笔记 :

    如果您遇到与 SPARK-20145 可以将SQL查询替换为:

    spark_session(sc) %>%
      invoke("range", as.integer(min_max[1]), as.integer(min_max[2]), 60L) %>% 
      sdf_register()