elasticsearchr
elasticsearchr copied to clipboard
scroll_search: problem in bind_rows()
We have an elasticsearch database that contains documents with varying fields. I believe this sometimes can result in bind_rows() failing with an error like 'bind_rows column can't be converted from list to character'. I believe this is caused by some queries returning different fields and then bind_rows() fails because the dataframes do not have the same columns.
Example:
......Error: Column threadcan't be converted from list to character
In my code I worked around it by retrieving the data in small chunks and then concatening the dataframes using plyr::rbind.fill() instead, but then I have to choose the retrieval time interval. Here choosing a lower interval also doesn't always work so it is not 100% reliable.
Below is some code showing what I am doing (it is not a complete example):
getElasticDataInTimeInterval = function(hostname, t0, t1, fields,
url = "http://localhost:9200",
deltatminutes = 5) {
if (!is.POSIXct(t0)) {
t0 = getTime(t0)
}
if (!is.POSIXct(t1)) {
t1 = getTime(t1)
}
# workaround for problem in elasticsearchr
# Because of paging, it can happen that in different pages
# the fields differ. But then bind_rows() will fail since the columns
# do not match. Therefore, we query smaller time intervals to avoid paging
# and use plyr::rbind.fill() instead to avoid this issue.
timeintervals = seq(t0, t1, deltatminutes*60)
res = list()
for (i in 1:(length(timeintervals)-1)) {
t0 = timeintervals[[i]]
t1 = timeintervals[[i+1]]
dt0 = date(with_tz(t0, "UTC"))
dt1 = date(with_tz(t1, "UTC"))
dates = seq(dt0, dt1, 1)
ressingle = lapply(dates, function(date) {
cat("Fetching data for [", paste(t0), ",", paste(t1), "> \n")
results = elastic(url, getElasticIndex(date), "doc") %search% (
filterHostAndTime(hostname, t0, t1) +
do.call(selectFields, as.list(fields))
)
cat('ok\n')
results
})
res = c(res, ressingle)
}
res %>%
#bind_rows() %>%
plyr::rbind.fill() %>%
rename(log_timestamp = '@timestamp') %>%
mutate(log_timestamp = with_tz(parse_datetime(log_timestamp), tzone = "Europe/Amsterdam"),
time.start = as.POSIXct(strptime(str_replace(time.start, ",", "."), "%Y-%m-%d %H:%M:%OS")),
time.end = as.POSIXct(strptime(str_replace(time.end, ",", "."), "%Y-%m-%d %H:%M:%OS"))) %>%
arrange(timestamp)
}
A stack trace of the error is as follows:
where 1: eval(substitute(browser(skipCalls = pos), list(pos = (length(sys.frames()) -
frame) + 2)), envir = sys.frame(frame))
where 2: eval(substitute(browser(skipCalls = pos), list(pos = (length(sys.frames()) -
frame) + 2)), envir = sys.frame(frame))
where 3: .rs.breakOnError(TRUE)
where 4: (function ()
{
.rs.breakOnError(TRUE)
})()
where 5: stop(list(message = "Column `thread` can't be converted from list to character",
call = NULL, cppstack = NULL))
where 6: bind_rows_(x, .id)
where 7: dplyr::bind_rows(scroll_results)
where 8: as.data.frame(dplyr::bind_rows(scroll_results), stringsAsFactors = FALSE)
where 9: scroll_search(rescource, api_call_payload)
where 10: `%search%.elastic`(elastic(url, getElasticIndex(date), "doc"),
(filterHostAndTime(hostname, t0, t1) + do.call(selectFields,
as.list(fields))))
where 11 at elasticsupport.R#71: elastic(url, getElasticIndex(date), "doc") %search% (filterHostAndTime(hostname,
t0, t1) + do.call(selectFields, as.list(fields)))
where 12: FUN(X[[i]], ...)
where 13 at elasticsupport.R#69: lapply(dates, function(date) {
cat("Fetching data for [", paste(t0), ",", paste(t1), "> \n")
results = elastic(url, getElasticIndex(date), "doc") %search%
(filterHostAndTime(hostname, t0, t1) + do.call(selectFields,
as.list(fields)))
cat("ok\n")
results
})
where 14: getElasticDataInTimeInterval(hostname, paste(date, t0), paste(date,
t1), "*", deltatminutes = 5)
The problem occurs with elasticsearchr version 0.3.1
It is most likely line 404 of utils.R