falcon icon indicating copy to clipboard operation
falcon copied to clipboard

Downstream with rails

Open pschrammel opened this issue 2 years ago • 2 comments

Hi,

I'm currently trying to include ActionController::Live and write to the response stream ruby 3.1.1, rails 7.0.2.2

ThreadError (Attempt to unlock a mutex which is not locked):
  
/usr/local/lib/ruby/3.1.0/monitor.rb:202:in `synchronize'
/usr/local/lib/ruby/3.1.0/monitor.rb:202:in `mon_synchronize'
actionpack (7.0.2.2) lib/action_dispatch/http/response.rb:185:in `await_commit'
actionpack (7.0.2.2) lib/action_controller/metal/live.rb:288:in `block in process'
activesupport (7.0.2.2) lib/active_support/concurrency/share_lock.rb:187:in `yield_shares'
activesupport (7.0.2.2) lib/active_support/dependencies/interlock.rb:41:in `permit_concurrent_loads'
actionpack (7.0.2.2) lib/action_controller/metal/live.rb:287:in `process'
....

Any hints if this ought to work? If yes, any hints what to try?

THX, awsome project

Sample code (streams s3 file with async-http from s3 and should write it chunked):

class CmsController < ApplicationController
  include ActionController::Live
  
  def get
    headers=nil
    S3_CLIENT.get_object( bucket: 'assets', key: request.env['REQUEST_PATH']) do |resp, chunk|
      unless headers
        headers=resp.headers.to_h
        logger.debug("Chunk received: #{headers}\n#{chunk.size}\n#{response.headers}\n#{chunk}\n")
        response.headers["Content-Type"] = headers["content-type"]
        response.headers.delete("Content-Length")
        response.headers["Cache-Control"] = "no-cache"
      end
      response.stream.write(chunk)
    end
    logger.debug("Done")
    response.stream.close
  end
end

pschrammel avatar Feb 27 '22 02:02 pschrammel

I'm trying to strip down the issue so the controller looks like this:

class CmsController < ApplicationController
  include ActionController::Live

  def get
    response.headers.delete("Content-Length")
    response.headers["X-Accel-Buffering"] = "no"
    logger.debug("Request: #{request.env['X-Request-ID']}, #{request.env['REQUEST_PATH']}")
      1.upto(60) do |nr|
        response.stream.write("<p>#{nr}</p>")
        logger.debug("sending: #{nr}")
        sleep 0.5
      end
      logger.debug("Done")
  ensure
    response.stream.close
  end
end

I don't get any errors but it doesn't seem to stream, but flush the results when it's calling response.stream.close

pschrammel avatar Feb 27 '22 12:02 pschrammel

Okay, it seem the Live plugin is not supported and not needed. I solved the downstream with rack hijack (just for others who have the same issue). Would be nice to have more control of the stream (like adding headers before writing the first chunk but perhaps that's for something later. I also posted the async_s3_client to see how proxying works (I hope I did it correctly)

class CmsController < ApplicationController
  def get
    s3_client=AsyncS3Client.new(
      access_key_id:  ENV['AWS_ACCESS_KEY_ID'],
      secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'],
      region: 'eu-west-1'
    )
     response.headers["rack.hijack"] = lambda do |stream|
      s3_client.get_object( bucket: 'somebucket', key: request.env['REQUEST_PATH']) do |resp, chunk|
        if chunk == :done
          stream.close
          logger.debug("CMS: Done, #{stream.class}")
        else
          logger.debug("CMS: Chunk #{chunk.bytesize}")
          stream.write(chunk)
        end
      end
    end

    response.headers["Cache-Control"] = "no-cache"
    response.headers.delete("Content-Length")
    response.headers["X-Accel-Buffering"] = "no"
    response.headers["Content-Type"] = "text/plain"
    head :ok
  end
end

require 'async/http/internet'

class AsyncS3Client

  def initialize(access_key_id:, secret_access_key:, region:)
    @access_key_id=access_key_id
    @secret_access_key=secret_access_key
    @region=region
  end


  def get_object(bucket:, key:)
    raise "need block" unless block_given?
    internet = Async::HTTP::Internet.new
    host="#{bucket}.s3.#{@region}.amazonaws.com"
    signer = Aws::Sigv4::Signer.new(
      service: 's3',
      region: @region,
      access_key_id: @access_key_id,
      secret_access_key: @secret_access_key,
      unsigned_headers: ["content-length"]
    )

    signature = signer.sign_request(
      http_method: 'GET',
      url: "https://#{host}#{key}",
      headers: {},
    )

    headers=signature.headers
    headers["user-agent"]="XXXX"
    headers.delete('host')

    Async do
      begin
        response = internet.get("https://#{host}#{key}", headers)
        response.each do |chunk|
            yield response, chunk
        end
        yield response, :done
      rescue
        logger.error("Error: #{$!}")
      ensure
       logger.info("Closing: #{host}#{key}")
       internet.close
      end
    end

  end

  def logger
    Rails.logger
  end
end


pschrammel avatar Feb 27 '22 23:02 pschrammel

Rails 7.1 supports Rack 3 which allows you to use a streaming body. This works for both Falcon and Puma. I would recommend avoiding ActionController::Live.

I believe your controller can look something like this:

  def get
    s3_client = AsyncS3Client.new(
      access_key_id:  ENV['AWS_ACCESS_KEY_ID'],
      secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'],
      region: 'eu-west-1'
    )

    headers = {
      'content-type' => 'text/plain',
      'cache-control' => 'no-cache',
      'x-accel-buffering' => 'no'
    }

    body = lambda do |stream|
      s3_client.get_object( bucket: 'somebucket', key: request.env['REQUEST_PATH']) do |resp, chunk|
        if chunk == :done
          stream.close
          logger.debug("CMS: Done, #{stream.class}")
        else
          logger.debug("CMS: Chunk #{chunk.bytesize}")
          stream.write(chunk)
        end
      end
    end

    self.response = Rack::Response[200, headers, body]
  end

ioquatix avatar Feb 01 '24 10:02 ioquatix

Hey @ioquatix , sorry for the resurrect. We're trying to make Falcon work with Rails 7.1 + Rack 3 streaming, but the example you've shared above doesn't work - it seems Rails doesn't like getting a Rack::Response assigned into the regular controller response. Any chance you have an example of Rails 7.1 + Falcon streaming, or tips on how to proceed with debugging/making this work? Thanks!

gyfis avatar Mar 22 '24 14:03 gyfis

Sure, let me see what I've got.

ioquatix avatar Mar 22 '24 19:03 ioquatix

Do you mind sharing a controller action that is having problems?

ioquatix avatar Mar 23 '24 11:03 ioquatix

Sure! Here's what we're trying to do. This is a bit simplified, but the issue with Rack vs. Rails responses still applies.

  def query
    rack_response = Rack::Response.new

    rack_response.body = lambda do |stream|
      rack_response.set_header('Last-Modified', Time.now.httpdate) # Prevent Rack ETag middleware from buffering the response
      rack_response.set_header('Content-Type', 'application/json')

      stream.write("#{Oj.dump({ scheduling: true }, mode: :compat)}\n")

      response = Typhoeus.get('https://example.com')
      rack_response.status = response.code
      stream.write(response_data)
    rescue StandardError => e
      stream.write({ error: "[#{token_hash}] Unknown error" }.to_json)
      rack_response.status = :internal_server_error
    end

    self.response = rack_response
  end

gyfis avatar Mar 23 '24 12:03 gyfis