smarter_csv icon indicating copy to clipboard operation
smarter_csv copied to clipboard

Make SmarterCSV thread-safe

Open jpcamara opened this issue 10 months ago • 3 comments

  • SmarterCSV used instance variables on a module, so they were shared across all threads

  • When different threads ran SmarterCSV, they could overwrite the instance values that were set, and effectively corrupt eachothers data. The simplest way to show this was to run a full process call from multiple threads - each threads data gets aggregated together into one large result

  • By using an instance, and passing it around, we simulate an instance approach while maintaining the current API

  • We also expose a thread-local set of instance variables which should provide backwards compatibility if someone were to directly access the global instance variables on the SmarterCSV module (which a spec did, and that's how I noticed this behavior)

jpcamara avatar Apr 05 '24 02:04 jpcamara

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 100.00%. Comparing base (07160c1) to head (0ad475f).

:exclamation: Current head 0ad475f differs from pull request most recent head 7f74f78. Consider uploading reports for the commit 7f74f78 to get more accurate results

Additional details and impacted files
@@            Coverage Diff            @@
##              main      #277   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files           11        11           
  Lines          380       379    -1     
=========================================
- Hits           380       379    -1     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Apr 05 '24 02:04 codecov[bot]

Hi @jpcamara, can you help me understand the use case where you run into issues with threading? I'd like to fully understand the scenario you're encountering.

In the systems where I've used SmarterCSV over the years, I typically have several Sidekiq workers:

  • one worker running SmarterCSV.process (optionally with chunking)
  • optionally one worker that processes the chunks
  • one worker that handles each line of the CSV

In systems where users upload CSV files, I upload/capture the file in S3, and then kick-off the Sidekiq worker to download and run SmarterCSV.process on the file, kicking-off the other workers in the process.

tilo avatar Apr 09 '24 14:04 tilo

Hi @jpcamara, can you help me understand the use case where you run into issues with threading? I'd like to fully understand the scenario you're encountering.

In the systems where I've used SmarterCSV over the years, I typically have several Sidekiq workers:

  • one worker running SmarterCSV.process (optionally with chunking)
  • optionally one worker that processes the chunks
  • one worker that handles each line of the CSV

In systems where users upload CSV files, I upload/capture the file in S3, and then kick-off the Sidekiq worker to download and run SmarterCSV.process on the file, kicking-off the other workers in the process.

Hey @tilo! There are a few issues i'm encountering.

To start - I've committed more threading scenarios (in threading_spec.rb) that break on main. It now includes header specs, and specs for line counts and chunk counts. There are scenarios I could create to corrupt the other instance variables in use as well.

In testing my upgrade to the newest version (1.10.3) I saw these issues I could easily reproduce and I needed to hold off using it as a result. Most of my usages of SmarterCSV were still on 1.7.1 (https://github.com/tilo/smarter_csv/blob/v1.7.1/lib/smarter_csv.rb), which used instance variables but only in a minor way. They had threading issues as well, but because the main processing was not dependent on them it didn't really matter.

I use SmarterCSV in a couple different ways:

  • In some simple cases, I just process a CSV in its entirety
  • In most cases, I process a CSV in chunks. In some cases I split those chunks into separate jobs, sometimes I process the chunks inline.

I am processing from a variety of sources in all different CSV formats with different headers/delimiters/file sizes. I don't have one dedicated worker doing the SmarterCSV.process call, so multiple workers/threads are operating on CSVs - often at the same time.

As a result, it's very easy for me to encounter the scenarios I have laid out in the threading_spec.rb.

  1. When not chunking, it's easy to have SmarterCSV concatenate results from each thread together. That means that multiple sources of data can get mixed together incorrectly. This is obviously a big issue, particularly if you were ever processing user submitted data and produced results that merged multiple peoples data together
  2. When chunking, it's easy to mix headers together. Sometimes headers are mixed between threads, sometimes they're just nil because the Ruby thread scheduler swapped out while the @headers assignment was happening.
  3. The returned chunk_count and csv_line_counts are also wrong. That means that if you rely on chunk_count for any kind of additional metadata or processing, you will get incorrect results when running multiple threads.

In CRuby, 1 and 3 are easy to recreate. 2 takes a bit more work because of the thread scheduler only swapping ruby code around every 100ms, which is why in the spec I run it multiple times. But that just means it will happen, but just more intermittently. On Truffle or JRuby it'd be very easy to recreate - I tried it on JRuby and it happened without the extra iterations, and it was broken in more dramatic ways (headers are very broken when run across multiple threads).

I'm confident my usage of SmarterCSV is not unconventional, and using an instance approach completely removes the thread safety issues.

jpcamara avatar Apr 10 '24 00:04 jpcamara

Hi @tilo! Any thoughts on this?

jpcamara avatar May 23 '24 22:05 jpcamara

I realized it's actually pretty easy to get 2 to happen, even on CRuby.

With the following CSVs, and the following code, you'll be able to easily recreate broken headers. Sometimes headers are swapped between threads, and sometimes they are nil and raise a NilClass error on zip. This is happening even when chunking files, it just requires process to be called on multiple threads for different files. If you run this code, you'll see something similar to the following:

Iteration[124]: Wrong header! a,b,c,d,e,f,g
Iteration[207]: Wrong header! h,i,j,k,l,m,n
Iteration[256]: Wrong header! a,b,c,d,e,f,g
Iteration[264]: Wrong header! a,b,c,d,e,f,g
Iteration[258]: Wrong header! a,b,c,d,e,f,g
Iteration[323]: Wrong header! h,i,j,k,l,m,n
Iteration[322]: Wrong header! a,b,c,d,e,f,g
Iteration[380]: Wrong header! a,b,c,d,e,f,g
Iteration[443]: Wrong header! h,i,j,k,l,m,n
Iteration[444]: undefined method `zip' for nil:NilClass
...

csv_one.csv

a,b,c,d,e,f,g
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7

csv_two.csv

h,i,j,k,l,m,n
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
require "concurrent-ruby"

def run_forever(pool_size: 10)
  pool = Concurrent::FixedThreadPool.new(pool_size)
  i = Concurrent::AtomicFixnum.new

  loop do
    pool.post do
      local_i = i.increment
      yield local_i
    end
  end
end

run_forever(pool_size: 10) do |i|
  if i.even?
    SmarterCSV.process('csv_one.csv', chunk_size: 5) do |chunk|
      raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[a b c d e f g]
    end
  else
    SmarterCSV.process('csv_two.csv', chunk_size: 5) do |chunk|
      raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[h i j k l m n]
    end
  end
rescue StandardError => e
  puts "Iteration[#{i}]: #{e.message}"
end

jpcamara avatar May 29 '24 00:05 jpcamara

Sorry to be so noisy! This'll be my last comment on the issue for awhile. I'm writing a series on concurrency in ruby, so I share some of my thoughts on threading issues and class-level ivars in there: https://jpcamara.com/2024/06/04/your-ruby-programs.html. Just more context on this issue.

jpcamara avatar Jun 04 '24 19:06 jpcamara

Confirming that we're seeing this with 1.10.3, too. Had a worker processing two CSVs (one more than a million rows) and had header/column corruption midway through processing. Please consider merging @tilo

contentfree avatar Jun 11 '24 21:06 contentfree

I'll have a look at the examples you provided

tilo avatar Jun 25 '24 07:06 tilo

I'm looking into it, but wanted to point out that when calling it concurrently, the simplest fix is adding a mutex:


run_forever(pool_size: 10) do |i|
  if i.even?
    MUTEX.synchronize do
      SmarterCSV.process('csv_one.csv', chunk_size: 5) do |chunk|
        raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[a b c d e f g]
      end
    end
  else
    MUTEX.synchronize do
      SmarterCSV.process('csv_two.csv', chunk_size: 5) do |chunk|
        raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[h i j k l m n]
      end
    end
  end
rescue StandardError => e
  puts "Iteration[#{i}]: #{e.message}"
end

tilo avatar Jul 02 '24 00:07 tilo

I'm looking into it, but wanted to point out that when calling it concurrently,

the simplest fix is adding a mutex:




run_forever(pool_size: 10) do |i|

  if i.even?

    MUTEX.synchronize do

      SmarterCSV.process('csv_one.csv', chunk_size: 5) do |chunk|

        raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[a b c d e f g]

      end

    end

  else

    MUTEX.synchronize do

      SmarterCSV.process('csv_two.csv', chunk_size: 5) do |chunk|

        raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[h i j k l m n]

      end

    end

  end

rescue StandardError => e

  puts "Iteration[#{i}]: #{e.message}"

end



My approach for handling thread safety issues in gems is to isolate that code to a "single_threaded" queue:

class UnsafeJob
  include Sidekiq::Job
  sidekiq_options queue: "single_threaded"

  def perform; end
end

Then I configure it to run with a concurrency of 1, and I instead run multiple processes or servers.

sidekiq -q single_threaded -c 1
# or put it in your config file
:concurrency: 1
:queues: 
  - single_threaded

This approach will work the same as a global mutex, but also allows horizontal scaling.

Using a global mutex technically works, but it means whichever thread acquires that mutex then completely hogs the GVL for any CSV processing, including bypassing the 100ms thread scheduler swap.

But if this was running on a web server like Puma, a mutex is the only reasonable option, like suggested.

jpcamara avatar Jul 02 '24 10:07 jpcamara

@contentfree @jpcamara please check out this PR: https://github.com/tilo/smarter_csv/pull/279

tilo avatar Jul 03 '24 12:07 tilo

Looks good to me. The "Breaking Change" note in the changelog isn't quite accurate: It's not a breaking change (as SmarterCSV.process still works). It's merely deprecated with a warning.

👍

On Wed, Jul 3, 2024 at 8:33 AM Tilo @.***> wrote:

@contentfree https://github.com/contentfree @jpcamara https://github.com/jpcamara please check out this PR: #279 https://github.com/tilo/smarter_csv/pull/279

— Reply to this email directly, view it on GitHub https://github.com/tilo/smarter_csv/pull/277#issuecomment-2205970419, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAACVTQMEXDXDO2NHOPLETLZKPVRZAVCNFSM6AAAAABFYK6RT2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDEMBVHE3TANBRHE . You are receiving this because you were mentioned.Message ID: @.***>

contentfree avatar Jul 03 '24 14:07 contentfree

FYI: The fix will be released soon from this PR

tilo avatar Jul 03 '24 22:07 tilo

Closing in favor of this fix: https://github.com/tilo/smarter_csv/pull/279

Thanks @tilo !

jpcamara avatar Jul 05 '24 15:07 jpcamara