paws icon indicating copy to clipboard operation
paws copied to clipboard

Allow list_objects_v2 to retrieve >1,000 Keys

Open TenanATC opened this issue 4 years ago • 3 comments

Is it possible to support the retrieval of more than 1,000 keys via recursive requests similar to the "get_bucket" function from the 'aws.s3' package?

TenanATC avatar Feb 22 '22 17:02 TenanATC

Hey sorry I don’t have time right now to give you a fully fleshed out answer, but this other answer may help: https://github.com/paws-r/paws/issues/30#issuecomment-731856133. This will repeatedly request the objects in chunks until it gets all of them. You will have to modify this to use the list objects continuation token however. Sorry again and hope this helps for the time being.

davidkretch avatar Mar 02 '22 03:03 davidkretch

@TenanATC I put together this function which lists objects by taking an s3$list_objects function call. It allows for listing of n number of pages for ease of use. I was partially inspired by https://github.com/paws-r/paws/issues/30#issuecomment-731856133 and the JavaScript example listed here: Listing object keys programmatically

list_objects_paginated <- function(f, max_retries = 5, max_pages = Inf) {
    response <- f
    result <- list(response)

    truncated <- TRUE
    pageMarker <- NULL
    pageNumber <- 1
    while (truncated && pageNumber < max_pages) {
        truncated <- response$IsTruncated

        if (truncated) {
            pageMarker <- response$Contents[[length(response$Contents)]]$Key
            call <- substitute(f)
            call$Marker <- pageMarker
        }

        message(stringr::str_interp('Page ${pageNumber}: ${pageMarker}'))

        retry <- TRUE
        retries <- 0
        while (retry && retries < max_retries) {
            response <- tryCatch(eval(call), error = \(e) e)
            if (inherits(response, "error")) {
                if (retries == max_retries) stop(response)

                wait_time <- 2 ^ retries / 10
                Sys.sleep(wait_time)
                retries <- retries + 1
            } else {
                retry <- FALSE
            }
        }

        pageNumber <- pageNumber + 1

        result <- c(result, list(response))
    }

    lastResponse <- result[[length(result)]]

    message(stringr::str_interp('Page ${pageNumber}: ${lastResponse$Contents[[length(lastResponse)]]$Key}'))

    return(result)
}

dereckmezquita avatar Aug 15 '22 18:08 dereckmezquita

I also wrote in an R6 class (compatible with box module system) so that I can use the s3 client object in scope and make more methods in the future easily:

S3 <- R6::R6Class(
    "S3",
    public = list(
        client = NULL,
        bucket_name = "",

        initialize = \(bucket_name, region = "us-east-1") {
            Sys.setenv(AWS_REGION = region)
            self$bucket_name <- bucket_name
            self$client <- paws::s3()
        },

        list_objects_paginated = \(
            f = self$client$list_objects(Bucket = self$bucket_name),
            max_retries = 5,
            max_pages = Inf
        ) {
            response <- f
            result <- list(response)

            truncated <- TRUE
            pageMarker <- NULL
            pageNumber <- 1
            while (truncated && pageNumber < max_pages) {
                truncated <- response$IsTruncated

                if (truncated) {
                    pageMarker <- response$Contents[[length(response$Contents)]]$Key
                    call <- substitute(f)
                    call$Marker <- pageMarker
                }

                message(stringr::str_interp('S3::list_objects_paginated: page ${pageNumber}: ${pageMarker}'))

                retry <- TRUE
                retries <- 0
                while (retry && retries < max_retries) {
                    response <- tryCatch(eval(call), error = \(e) e)
                    if (inherits(response, "error")) {
                        if (retries == max_retries) stop(response)

                        wait_time <- 2 ^ retries / 10
                        Sys.sleep(wait_time)
                        retries <- retries + 1
                    } else {
                        retry <- FALSE
                    }
                }

                pageNumber <- pageNumber + 1

                result <- c(result, list(response))
            }

            lastResponse <- result[[length(result)]]

            message(stringr::str_interp('S3::list_objects_paginated: page ${pageNumber}: ${lastResponse$Contents[[length(lastResponse)]]$Key}'))

            return(result)
        }
    )
)


## ------------
s3 <- SimpleS3$new(bucket_name = "some-name")

results <- s3$list_objects_paginated(max_pages = 5)

dereckmezquita avatar Aug 15 '22 20:08 dereckmezquita

The replies above are great. Thank you all for writing them! Using classes is beyond my skill and would be overkill for my use case, too. So I've used the examples above to make a function that suits my needs. Big thanks to @dereckdemezquita because I've pretty much used one of his solutions here and just modified it for my own purposes.

This version has no printing/warnings/messages, or option to specify the max number of pages, just returns all the results in a two-column tibble of Key and LastModified. It also checks to see if the first response is truncated or not before attempting to request more results. That means the function plays nice with any number of pages including if there is just one.

list_s3_objects <- function(bucket, prefix, last_modified = TRUE, max_retries = 5) {
  
  response <- paws.storage::s3()$list_objects_v2(
    Bucket = bucket,
    Prefix = prefix
  )
  
  responses <- list(response)
  
  if (response[["IsTruncated"]]) {
    
    truncated <- TRUE
    
    while (truncated) {
      
      retry <- TRUE
      retries <- 0
      
      # If an error is returned by AWS then try again with exponential backoff
      while (retry && retries < max_retries) {
        
        response <- tryCatch(
          paws.storage::s3()$list_objects_v2(
            Bucket = bucket,
            Prefix = prefix,
            ContinuationToken = response[["NextContinuationToken"]]
          ), 
          error = \(e) e
        )
        
        if (inherits(response, "error")) {
          if (retries == max_retries) stop(response)
          
          wait_time <- 2 ^ retries / 10
          Sys.sleep(wait_time)
          retries <- retries + 1
        } else {
          retry <- FALSE
        }
      }
      
      responses <- append(responses, list(response))
      
      truncated <- response[["IsTruncated"]]
      
    }
    
  } 
  
  df_responses <- responses |> 
    purrr::map("Contents") |> 
    purrr::map(
      \(x) purrr::map(
        x, \(y) purrr::keep(y, names(y) %in% c("Key", "LastModified"))
        )
      ) |> 
    dplyr::bind_rows()
  
  if (last_modified) {
    dplyr::arrange(df_responses, dplyr::desc(LastModified))
  } else {
    df_responses
  }
  
}

jimgar avatar Jul 24 '23 22:07 jimgar

Hi @TenanATC,

Thanks for this. I am currently implementing a paginate function that will hopefully make this all simpler for users to do these sort of actions PR: https://github.com/paws-r/paws/pull/650. It should be ready for paws v0.4.0

But in short this is what it will offer:

library(paws.storage)

svc <- s3()

paginate(
    svc$list_objects_v2(Bucket = "made-up")
)

DyfanJones avatar Jul 24 '23 23:07 DyfanJones

not to distract from the discussion but as user who's followed this thread from the beginning waiting for this: the minioclient R package provides a simple and fast way to list an arbitrary number of objects in a bucket, mc_ls("alias/bucket_name"). It is merely calling an implementation in golang so avoids having to handle parsing and pagination in R routines, and is fully threaded.

cboettig avatar Jul 28 '23 20:07 cboettig

@cboettig that sounds really promising :)

Just an update on this. I have this is a good place and will be merging the PR in shortly.

Examples:

library(paws)

svc <- s3()

# standard paginate
results <- svc$list_objects(Bucket = "mybucket") |> paginate(PageSize = 1000)

# paginate_lapply allows users to use a function on each returning response.
results <- svc$list_objects(Bucket = "mybucket") |> paginate_lapply(\(resp) resp$Content, PageSize = 1000)

Once I have merged #650 I will regen paws so that r-universe is updated. This will let anyone to try this functionality out before it hits the cran. :)

DyfanJones avatar Jul 28 '23 21:07 DyfanJones

This feature is currently on the dev r-universe version:

install.packages('paws', repos = c('https://paws-r.r-universe.dev', 'https://cloud.r-project.org'))

Please feel free to try it out :)

DyfanJones avatar Jul 31 '23 22:07 DyfanJones

paws v-0.4.0 has now been released to the cran. I will close this ticket for now.

DyfanJones avatar Sep 15 '23 16:09 DyfanJones