elasticsearchr icon indicating copy to clipboard operation
elasticsearchr copied to clipboard

scroll_search: problem in bind_rows()

Open ErikEngerd opened this issue 5 years ago • 2 comments

We have an elasticsearch database that contains documents with varying fields. I believe this sometimes can result in bind_rows() failing with an error like 'bind_rows column can't be converted from list to character'. I believe this is caused by some queries returning different fields and then bind_rows() fails because the dataframes do not have the same columns.

Example: ......Error: Column threadcan't be converted from list to character

In my code I worked around it by retrieving the data in small chunks and then concatening the dataframes using plyr::rbind.fill() instead, but then I have to choose the retrieval time interval. Here choosing a lower interval also doesn't always work so it is not 100% reliable.

Below is some code showing what I am doing (it is not a complete example):

getElasticDataInTimeInterval = function(hostname, t0, t1, fields, 
                                        url = "http://localhost:9200",
                                        deltatminutes = 5) { 
  if (!is.POSIXct(t0)) { 
    t0 = getTime(t0)
  }
  if (!is.POSIXct(t1)) { 
    t1 = getTime(t1)
  }
  # workaround for problem in elasticsearchr
  # Because of paging, it can happen that in different pages
  # the fields differ. But then bind_rows() will fail since the columns
  # do not match. Therefore, we query smaller time intervals to avoid paging
  # and use plyr::rbind.fill() instead to avoid this issue. 
  timeintervals = seq(t0, t1, deltatminutes*60)
  res = list()
  for (i in 1:(length(timeintervals)-1)) { 
    t0 = timeintervals[[i]]
    t1 = timeintervals[[i+1]]
    
    dt0 = date(with_tz(t0, "UTC"))
    dt1 = date(with_tz(t1, "UTC"))
    dates = seq(dt0, dt1, 1)
    
    
    ressingle = lapply(dates, function(date) {
      cat("Fetching data for [", paste(t0), ",", paste(t1), "> \n")
      results = elastic(url, getElasticIndex(date), "doc") %search% (
        filterHostAndTime(hostname, t0, t1) + 
          do.call(selectFields, as.list(fields))
      )
      cat('ok\n')
      results
    })
    res = c(res, ressingle)
  }

  res %>% 
    #bind_rows() %>%
    plyr::rbind.fill() %>%
    rename(log_timestamp = '@timestamp') %>%
    mutate(log_timestamp  = with_tz(parse_datetime(log_timestamp), tzone = "Europe/Amsterdam"), 
           time.start = as.POSIXct(strptime(str_replace(time.start, ",", "."), "%Y-%m-%d %H:%M:%OS")),
           time.end   = as.POSIXct(strptime(str_replace(time.end, ",", "."),  "%Y-%m-%d %H:%M:%OS"))) %>%
    arrange(timestamp)
}

ErikEngerd avatar Jul 08 '20 12:07 ErikEngerd

A stack trace of the error is as follows:

where 1: eval(substitute(browser(skipCalls = pos), list(pos = (length(sys.frames()) - 
    frame) + 2)), envir = sys.frame(frame))
where 2: eval(substitute(browser(skipCalls = pos), list(pos = (length(sys.frames()) - 
    frame) + 2)), envir = sys.frame(frame))
where 3: .rs.breakOnError(TRUE)
where 4: (function () 
{
    .rs.breakOnError(TRUE)
})()
where 5: stop(list(message = "Column `thread` can't be converted from list to character", 
    call = NULL, cppstack = NULL))
where 6: bind_rows_(x, .id)
where 7: dplyr::bind_rows(scroll_results)
where 8: as.data.frame(dplyr::bind_rows(scroll_results), stringsAsFactors = FALSE)
where 9: scroll_search(rescource, api_call_payload)
where 10: `%search%.elastic`(elastic(url, getElasticIndex(date), "doc"), 
    (filterHostAndTime(hostname, t0, t1) + do.call(selectFields, 
        as.list(fields))))
where 11 at elasticsupport.R#71: elastic(url, getElasticIndex(date), "doc") %search% (filterHostAndTime(hostname, 
    t0, t1) + do.call(selectFields, as.list(fields)))
where 12: FUN(X[[i]], ...)
where 13 at elasticsupport.R#69: lapply(dates, function(date) {
    cat("Fetching data for [", paste(t0), ",", paste(t1), "> \n")
    results = elastic(url, getElasticIndex(date), "doc") %search% 
        (filterHostAndTime(hostname, t0, t1) + do.call(selectFields, 
            as.list(fields)))
    cat("ok\n")
    results
})
where 14: getElasticDataInTimeInterval(hostname, paste(date, t0), paste(date, 
    t1), "*", deltatminutes = 5)

The problem occurs with elasticsearchr version 0.3.1

ErikEngerd avatar Jul 08 '20 12:07 ErikEngerd

It is most likely line 404 of utils.R

ErikEngerd avatar Jul 08 '20 13:07 ErikEngerd