aws.s3 icon indicating copy to clipboard operation
aws.s3 copied to clipboard

S3 Select not working

Open berkorbay opened this issue 6 years ago • 5 comments

Hi I am getting the following error when I use select_object.

[1] "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>MalformedXML</Code><Message>The XML provided was not well-formed or did not validate against our published schema. Please check the service documentation and try again.</Message><RequestId>9F45EFB54C52001B</RequestId><HostId>oVpCc6ZxF01w3JeISl9rGxYl8RwzWT0VeKuXc3nxmVaPWCPbG3KDc1i8Gzhm4etAgrbt+1D6WEI=</HostId></Error>"

I am trying to query a CSV file and I tried the most basic query.

aaa<-select_object(object="MYOBJECTPATH.csv",bucket="MYBUCKET",request_body="select * from s3object s")

What am I doing wrong here?

Edit: As a follow-up I tried the request body in here to no avail.

https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html

berkorbay avatar Jan 07 '19 09:01 berkorbay

@leeper I am running into the same issue (I am using a CSV-to-CSV request body from the AWS website). Would you be able to look into this?

slava-kohut avatar Apr 16 '19 13:04 slava-kohut

I know this is old, but I have a similar problem. Although I am able to get the request to successfully complete, I am unable to parse the response into anything from the raw vector. rawToChar() complains about embeded nulls, read_xml() says the document is empty. Removing nulls manually from the raw vector (00s) does not help. Has anyone gotten this to work?

Code to execute:

library(aws.s3)
library(xml2)

req_list <-
  list(
    SelectRequest = list(
      Expression = list("SELECT * FROM S3Object"),
      ExpressionType = list("SQL"),
      InputSerialization = list(
        CSV = list(
          FileHeaderInfo = list("USE")
        )
      ),
      OutputSerialization = list(
        CSV = list()
      )
    )
  )

xml_doc <- as_xml_document(req_list)

select_object(
  object = "mtcars.csv",
  bucket = "bucket",
  request_body = as.character(xml_doc)
) -> req

ndiquattro avatar Aug 15 '20 21:08 ndiquattro

I'm starting on this too...

I've got some looping and header parsing to do but basically:

contentRequest <- xml_new_root("SelectObjectContentRequest",
             xmlns = "http://s3.amazonaws.com/doc/2006-03-01/") %>%
  xml_add_child("Expression", "SELECT * FROM s3Object s")  %>%
  xml_add_sibling("ExpressionType", "SQL")  %>%
  xml_add_sibling("InputSerialization")  %>%
    xml_add_child("CSV")  %>%
      xml_add_child("FileHeaderInfo", "USE")  %>%
      xml_parent() %>%
    xml_parent() %>%
  xml_add_sibling("OutputSerialization")  %>%
    xml_add_child("CSV")  %>%
      xml_add_child("FileHeaderInfo", "USE")  %>%
  xml_root() %>%
  as.character()

select_object(
  bucket = s3_bucket_name, 
  object = csv_path,
  request_body =  contentRequest 
) -> req

# spec on https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
connect <- rawConnection(req, "r")

# there are several chunks that look like:
fullSize <- readBin(connect, "integer", size=4, endian = "big")
headerSize <- readBin(connect, "integer", size=4, endian = "big")
ignoredCrc <- readBin(connect, "integer", size=4, endian = "big")
payloadSize <- fullSize - headerSize - 16
# should read the headers here...
seek(connect, where=headerSize, origin="current")
text <- readChar(connect, payloadSize)
ignoredCrc2 <- readBin(connect, "integer", size=4, endian = "big")

Will try to finish this an make a helper method or PR soon...

slodge avatar Sep 25 '20 17:09 slodge

Here's a bit more....

library(tidyverse)
library(lubridate)
library(RedditExtractoR)
library(aws.s3)
library(arrow)
library(xml2)

s3_bucket_name <- ".."
s3_path_root <- ".."


csv_path <- paste0(s3_path_root, "/", current_term, ".csv")




contentRequest <- xml_new_root("SelectObjectContentRequest",
                      xmlns = "http://s3.amazonaws.com/doc/2006-03-01/") %>%
  xml_add_child("Expression", "SELECT * FROM s3Object s")  %>%
  xml_add_sibling("ExpressionType", "SQL")  %>%
  xml_add_sibling("InputSerialization")  %>%
  xml_add_child("CSV")  %>%
  xml_add_child("FileHeaderInfo", "USE")  %>%
  xml_parent() %>%
  xml_parent() %>%
  xml_add_sibling("OutputSerialization")  %>%
  xml_add_child("CSV")  %>% # or "JSON"
  xml_add_child("FileHeaderInfo", "USE")  %>%
  xml_root() %>%
  as.character()

req <- select_object(
  bucket = s3_bucket_name, 
  object = csv_path,
  request_body =  contentRequest 
) 


# parse using spec on https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
messages <- list()
connect <- rawConnection(req, "r")
while(seek(connect) < length(req)) {
  messageStart <- seek(connect)
  fullSize <- readBin(connect, "integer", size=4, endian = "big")
  headerSize <- readBin(connect, "integer", size=4, endian = "big")
  ignoredCrc <- readBin(connect, "integer", size=4, endian = "big")
  payloadSize <- fullSize - headerSize - 16
  
  message <- list()
  
  while (seek(connect) < messageStart + headerSize + 12) {
    if (seek(connect) >= length(req)) {
      stop("Parse error - we have exceed the total response body length while parsing headers")
    }
      
    headerNameLen <- readBin(connect, "integer", size=1, endian = "big")
    headerName <- readChar(connect, headerNameLen)
    headerValueType <- readBin(connect, "integer", size=1, endian = "big")
    headerValueLen <-  readBin(connect, "integer", size=2, endian = "big")
    headerValue <- readChar(connect, headerValueLen)
    #message("Header '", headerName, "' was '", headerValue, "'")
    
    if (headerName == "payload") {
      stop("Unexpected headerName - header cannot be called 'payload'")
    }
    
    message[headerName] <- headerValue
  }
  
  message$payload <- readBin(connect, "raw", n=payloadSize)
  ignoredCrc2 <- readBin(connect, "integer", size=4, endian = "big")
  #text <- readChar(connect, payloadSize)
  #message("Text seen", text)
  
  messages <- c(messages, list(message))
  
  if (seek(connect) > length(req)) {
    stop("Parse error - we have exceed the total response body length while reading the body")
  }
}
if (seek(connect) < length(req)) {
  stop("Parse error - we stopped parsing early - current position ", seek(connect), " total length ", length(req))
}
if (seek(connect) > length(req)) {
  stop("Parse error - we overran during parsing - current position ", seek(connect), " total length ", length(req))
}
close.connection(connect)

lastMessage <-
  messages %>% 
  tail(1) %>% 
  unlist()

if (lastMessage[[":message-type"]] != "event"
    || lastMessage[[":event-type"]] != "End") {
  stop("Our stream did not end with and End event, instead it ended with ", lastMessage)  
}

payloads <-
  messages %>% 
  purrr::keep(~ .x[":message-type"] == "event") %>% 
  purrr::keep(~ .x[":event-type"] == "Records") %>% 
  map("payload")

# for csv content...
text_payloads <-
  payloads  %>% 
  map_chr(~ readChar(.x, n = length(.x)))

csvContent <- readr::read_csv(text_payloads, col_names = F)

# for json content...
# more work needed here - need to cope with the json format from SELECT (which uses spaces between records)
# also need to look at the text encoding stuff...

#text_payloads <-
#  payloads  %>% 
# map_chr(~ iconv(readBin(.x, character()),  to ="UTF-8"))#
#  map_chr(~ rawToChar(.x, multiple = T),)

#str(text_payloads)
#jsonContent <- jsonlite::fromJSON(text_payloads)

This code needs more testing before prod use 👍

Will think about where/how to supply this as a PR... also very happy to have others contribute on text decoding stuff... or if you fancy writing the CRC checks 👍

slodge avatar Sep 27 '20 10:09 slodge

A package is in https://github.com/slodge/aws.s3.select - also working on a PR for here ... and will ask how larger package might be integrated...

PR is https://github.com/cloudyr/aws.s3/pull/377

slodge avatar Oct 13 '20 14:10 slodge