aws.s3
aws.s3 copied to clipboard
S3 Select not working
Hi I am getting the following error when I use select_object
.
[1] "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>MalformedXML</Code><Message>The XML provided was not well-formed or did not validate against our published schema. Please check the service documentation and try again.</Message><RequestId>9F45EFB54C52001B</RequestId><HostId>oVpCc6ZxF01w3JeISl9rGxYl8RwzWT0VeKuXc3nxmVaPWCPbG3KDc1i8Gzhm4etAgrbt+1D6WEI=</HostId></Error>"
I am trying to query a CSV file and I tried the most basic query.
aaa<-select_object(object="MYOBJECTPATH.csv",bucket="MYBUCKET",request_body="select * from s3object s")
What am I doing wrong here?
Edit: As a follow-up I tried the request body in here to no avail.
https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html
@leeper I am running into the same issue (I am using a CSV-to-CSV request body from the AWS website). Would you be able to look into this?
I know this is old, but I have a similar problem. Although I am able to get the request to successfully complete, I am unable to parse the response into anything from the raw vector. rawToChar()
complains about embeded nulls, read_xml()
says the document is empty. Removing nulls manually from the raw vector (00
s) does not help. Has anyone gotten this to work?
Code to execute:
library(aws.s3)
library(xml2)
req_list <-
list(
SelectRequest = list(
Expression = list("SELECT * FROM S3Object"),
ExpressionType = list("SQL"),
InputSerialization = list(
CSV = list(
FileHeaderInfo = list("USE")
)
),
OutputSerialization = list(
CSV = list()
)
)
)
xml_doc <- as_xml_document(req_list)
select_object(
object = "mtcars.csv",
bucket = "bucket",
request_body = as.character(xml_doc)
) -> req
I'm starting on this too...
I've got some looping and header parsing to do but basically:
contentRequest <- xml_new_root("SelectObjectContentRequest",
xmlns = "http://s3.amazonaws.com/doc/2006-03-01/") %>%
xml_add_child("Expression", "SELECT * FROM s3Object s") %>%
xml_add_sibling("ExpressionType", "SQL") %>%
xml_add_sibling("InputSerialization") %>%
xml_add_child("CSV") %>%
xml_add_child("FileHeaderInfo", "USE") %>%
xml_parent() %>%
xml_parent() %>%
xml_add_sibling("OutputSerialization") %>%
xml_add_child("CSV") %>%
xml_add_child("FileHeaderInfo", "USE") %>%
xml_root() %>%
as.character()
select_object(
bucket = s3_bucket_name,
object = csv_path,
request_body = contentRequest
) -> req
# spec on https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
connect <- rawConnection(req, "r")
# there are several chunks that look like:
fullSize <- readBin(connect, "integer", size=4, endian = "big")
headerSize <- readBin(connect, "integer", size=4, endian = "big")
ignoredCrc <- readBin(connect, "integer", size=4, endian = "big")
payloadSize <- fullSize - headerSize - 16
# should read the headers here...
seek(connect, where=headerSize, origin="current")
text <- readChar(connect, payloadSize)
ignoredCrc2 <- readBin(connect, "integer", size=4, endian = "big")
Will try to finish this an make a helper method or PR soon...
Here's a bit more....
library(tidyverse)
library(lubridate)
library(RedditExtractoR)
library(aws.s3)
library(arrow)
library(xml2)
s3_bucket_name <- ".."
s3_path_root <- ".."
csv_path <- paste0(s3_path_root, "/", current_term, ".csv")
contentRequest <- xml_new_root("SelectObjectContentRequest",
xmlns = "http://s3.amazonaws.com/doc/2006-03-01/") %>%
xml_add_child("Expression", "SELECT * FROM s3Object s") %>%
xml_add_sibling("ExpressionType", "SQL") %>%
xml_add_sibling("InputSerialization") %>%
xml_add_child("CSV") %>%
xml_add_child("FileHeaderInfo", "USE") %>%
xml_parent() %>%
xml_parent() %>%
xml_add_sibling("OutputSerialization") %>%
xml_add_child("CSV") %>% # or "JSON"
xml_add_child("FileHeaderInfo", "USE") %>%
xml_root() %>%
as.character()
req <- select_object(
bucket = s3_bucket_name,
object = csv_path,
request_body = contentRequest
)
# parse using spec on https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
messages <- list()
connect <- rawConnection(req, "r")
while(seek(connect) < length(req)) {
messageStart <- seek(connect)
fullSize <- readBin(connect, "integer", size=4, endian = "big")
headerSize <- readBin(connect, "integer", size=4, endian = "big")
ignoredCrc <- readBin(connect, "integer", size=4, endian = "big")
payloadSize <- fullSize - headerSize - 16
message <- list()
while (seek(connect) < messageStart + headerSize + 12) {
if (seek(connect) >= length(req)) {
stop("Parse error - we have exceed the total response body length while parsing headers")
}
headerNameLen <- readBin(connect, "integer", size=1, endian = "big")
headerName <- readChar(connect, headerNameLen)
headerValueType <- readBin(connect, "integer", size=1, endian = "big")
headerValueLen <- readBin(connect, "integer", size=2, endian = "big")
headerValue <- readChar(connect, headerValueLen)
#message("Header '", headerName, "' was '", headerValue, "'")
if (headerName == "payload") {
stop("Unexpected headerName - header cannot be called 'payload'")
}
message[headerName] <- headerValue
}
message$payload <- readBin(connect, "raw", n=payloadSize)
ignoredCrc2 <- readBin(connect, "integer", size=4, endian = "big")
#text <- readChar(connect, payloadSize)
#message("Text seen", text)
messages <- c(messages, list(message))
if (seek(connect) > length(req)) {
stop("Parse error - we have exceed the total response body length while reading the body")
}
}
if (seek(connect) < length(req)) {
stop("Parse error - we stopped parsing early - current position ", seek(connect), " total length ", length(req))
}
if (seek(connect) > length(req)) {
stop("Parse error - we overran during parsing - current position ", seek(connect), " total length ", length(req))
}
close.connection(connect)
lastMessage <-
messages %>%
tail(1) %>%
unlist()
if (lastMessage[[":message-type"]] != "event"
|| lastMessage[[":event-type"]] != "End") {
stop("Our stream did not end with and End event, instead it ended with ", lastMessage)
}
payloads <-
messages %>%
purrr::keep(~ .x[":message-type"] == "event") %>%
purrr::keep(~ .x[":event-type"] == "Records") %>%
map("payload")
# for csv content...
text_payloads <-
payloads %>%
map_chr(~ readChar(.x, n = length(.x)))
csvContent <- readr::read_csv(text_payloads, col_names = F)
# for json content...
# more work needed here - need to cope with the json format from SELECT (which uses spaces between records)
# also need to look at the text encoding stuff...
#text_payloads <-
# payloads %>%
# map_chr(~ iconv(readBin(.x, character()), to ="UTF-8"))#
# map_chr(~ rawToChar(.x, multiple = T),)
#str(text_payloads)
#jsonContent <- jsonlite::fromJSON(text_payloads)
This code needs more testing before prod use 👍
Will think about where/how to supply this as a PR... also very happy to have others contribute on text decoding stuff... or if you fancy writing the CRC checks 👍
A package is in https://github.com/slodge/aws.s3.select - also working on a PR for here ... and will ask how larger package might be integrated...
PR is https://github.com/cloudyr/aws.s3/pull/377