smarter_csv
smarter_csv copied to clipboard
Make SmarterCSV thread-safe
-
SmarterCSV used instance variables on a module, so they were shared across all threads
-
When different threads ran SmarterCSV, they could overwrite the instance values that were set, and effectively corrupt eachothers data. The simplest way to show this was to run a full
process
call from multiple threads - each threads data gets aggregated together into one large result -
By using an instance, and passing it around, we simulate an instance approach while maintaining the current API
-
We also expose a thread-local set of instance variables which should provide backwards compatibility if someone were to directly access the global instance variables on the SmarterCSV module (which a spec did, and that's how I noticed this behavior)
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 100.00%. Comparing base (
07160c1
) to head (0ad475f
).
:exclamation: Current head 0ad475f differs from pull request most recent head 7f74f78. Consider uploading reports for the commit 7f74f78 to get more accurate results
Additional details and impacted files
@@ Coverage Diff @@
## main #277 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 11 11
Lines 380 379 -1
=========================================
- Hits 380 379 -1
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
Hi @jpcamara, can you help me understand the use case where you run into issues with threading? I'd like to fully understand the scenario you're encountering.
In the systems where I've used SmarterCSV over the years, I typically have several Sidekiq workers:
- one worker running SmarterCSV.process (optionally with chunking)
- optionally one worker that processes the chunks
- one worker that handles each line of the CSV
In systems where users upload CSV files, I upload/capture the file in S3, and then kick-off the Sidekiq worker to download and run SmarterCSV.process on the file, kicking-off the other workers in the process.
Hi @jpcamara, can you help me understand the use case where you run into issues with threading? I'd like to fully understand the scenario you're encountering.
In the systems where I've used SmarterCSV over the years, I typically have several Sidekiq workers:
- one worker running SmarterCSV.process (optionally with chunking)
- optionally one worker that processes the chunks
- one worker that handles each line of the CSV
In systems where users upload CSV files, I upload/capture the file in S3, and then kick-off the Sidekiq worker to download and run SmarterCSV.process on the file, kicking-off the other workers in the process.
Hey @tilo! There are a few issues i'm encountering.
To start - I've committed more threading scenarios (in threading_spec.rb
) that break on main
. It now includes header specs, and specs for line counts and chunk counts. There are scenarios I could create to corrupt the other instance variables in use as well.
In testing my upgrade to the newest version (1.10.3) I saw these issues I could easily reproduce and I needed to hold off using it as a result. Most of my usages of SmarterCSV
were still on 1.7.1 (https://github.com/tilo/smarter_csv/blob/v1.7.1/lib/smarter_csv.rb), which used instance variables but only in a minor way. They had threading issues as well, but because the main processing was not dependent on them it didn't really matter.
I use SmarterCSV
in a couple different ways:
- In some simple cases, I just
process
a CSV in its entirety - In most cases, I process a CSV in chunks. In some cases I split those chunks into separate jobs, sometimes I process the chunks inline.
I am processing from a variety of sources in all different CSV formats with different headers/delimiters/file sizes. I don't have one dedicated worker doing the SmarterCSV.process
call, so multiple workers/threads are operating on CSVs - often at the same time.
As a result, it's very easy for me to encounter the scenarios I have laid out in the threading_spec.rb
.
- When not chunking, it's easy to have
SmarterCSV
concatenate results from each thread together. That means that multiple sources of data can get mixed together incorrectly. This is obviously a big issue, particularly if you were ever processing user submitted data and produced results that merged multiple peoples data together - When chunking, it's easy to mix headers together. Sometimes headers are mixed between threads, sometimes they're just nil because the Ruby thread scheduler swapped out while the
@headers
assignment was happening. - The returned
chunk_count
andcsv_line_counts
are also wrong. That means that if you rely onchunk_count
for any kind of additional metadata or processing, you will get incorrect results when running multiple threads.
In CRuby, 1
and 3
are easy to recreate. 2
takes a bit more work because of the thread scheduler only swapping ruby code around every 100ms, which is why in the spec I run it multiple times. But that just means it will happen, but just more intermittently. On Truffle or JRuby it'd be very easy to recreate - I tried it on JRuby and it happened without the extra iterations, and it was broken in more dramatic ways (headers are very broken when run across multiple threads).
I'm confident my usage of SmarterCSV
is not unconventional, and using an instance approach completely removes the thread safety issues.
Hi @tilo! Any thoughts on this?
I realized it's actually pretty easy to get 2
to happen, even on CRuby.
With the following CSVs, and the following code, you'll be able to easily recreate broken headers. Sometimes headers are swapped between threads, and sometimes they are nil and raise a NilClass error on zip
. This is happening even when chunking files, it just requires process
to be called on multiple threads for different files. If you run this code, you'll see something similar to the following:
Iteration[124]: Wrong header! a,b,c,d,e,f,g
Iteration[207]: Wrong header! h,i,j,k,l,m,n
Iteration[256]: Wrong header! a,b,c,d,e,f,g
Iteration[264]: Wrong header! a,b,c,d,e,f,g
Iteration[258]: Wrong header! a,b,c,d,e,f,g
Iteration[323]: Wrong header! h,i,j,k,l,m,n
Iteration[322]: Wrong header! a,b,c,d,e,f,g
Iteration[380]: Wrong header! a,b,c,d,e,f,g
Iteration[443]: Wrong header! h,i,j,k,l,m,n
Iteration[444]: undefined method `zip' for nil:NilClass
...
csv_one.csv
a,b,c,d,e,f,g
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
csv_two.csv
h,i,j,k,l,m,n
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
require "concurrent-ruby"
def run_forever(pool_size: 10)
pool = Concurrent::FixedThreadPool.new(pool_size)
i = Concurrent::AtomicFixnum.new
loop do
pool.post do
local_i = i.increment
yield local_i
end
end
end
run_forever(pool_size: 10) do |i|
if i.even?
SmarterCSV.process('csv_one.csv', chunk_size: 5) do |chunk|
raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[a b c d e f g]
end
else
SmarterCSV.process('csv_two.csv', chunk_size: 5) do |chunk|
raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[h i j k l m n]
end
end
rescue StandardError => e
puts "Iteration[#{i}]: #{e.message}"
end
Sorry to be so noisy! This'll be my last comment on the issue for awhile. I'm writing a series on concurrency in ruby, so I share some of my thoughts on threading issues and class-level ivars in there: https://jpcamara.com/2024/06/04/your-ruby-programs.html. Just more context on this issue.
Confirming that we're seeing this with 1.10.3, too. Had a worker processing two CSVs (one more than a million rows) and had header/column corruption midway through processing. Please consider merging @tilo
I'll have a look at the examples you provided
I'm looking into it, but wanted to point out that when calling it concurrently, the simplest fix is adding a mutex:
run_forever(pool_size: 10) do |i|
if i.even?
MUTEX.synchronize do
SmarterCSV.process('csv_one.csv', chunk_size: 5) do |chunk|
raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[a b c d e f g]
end
end
else
MUTEX.synchronize do
SmarterCSV.process('csv_two.csv', chunk_size: 5) do |chunk|
raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[h i j k l m n]
end
end
end
rescue StandardError => e
puts "Iteration[#{i}]: #{e.message}"
end
I'm looking into it, but wanted to point out that when calling it concurrently,
the simplest fix is adding a mutex:
run_forever(pool_size: 10) do |i| if i.even? MUTEX.synchronize do SmarterCSV.process('csv_one.csv', chunk_size: 5) do |chunk| raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[a b c d e f g] end end else MUTEX.synchronize do SmarterCSV.process('csv_two.csv', chunk_size: 5) do |chunk| raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[h i j k l m n] end end end rescue StandardError => e puts "Iteration[#{i}]: #{e.message}" end
My approach for handling thread safety issues in gems is to isolate that code to a "single_threaded" queue:
class UnsafeJob
include Sidekiq::Job
sidekiq_options queue: "single_threaded"
def perform; end
end
Then I configure it to run with a concurrency of 1, and I instead run multiple processes or servers.
sidekiq -q single_threaded -c 1
# or put it in your config file
:concurrency: 1
:queues:
- single_threaded
This approach will work the same as a global mutex, but also allows horizontal scaling.
Using a global mutex technically works, but it means whichever thread acquires that mutex then completely hogs the GVL for any CSV processing, including bypassing the 100ms thread scheduler swap.
But if this was running on a web server like Puma, a mutex is the only reasonable option, like suggested.
@contentfree @jpcamara please check out this PR: https://github.com/tilo/smarter_csv/pull/279
Looks good to me. The "Breaking Change" note in the changelog isn't quite accurate: It's not a breaking change (as SmarterCSV.process still works). It's merely deprecated with a warning.
👍
On Wed, Jul 3, 2024 at 8:33 AM Tilo @.***> wrote:
@contentfree https://github.com/contentfree @jpcamara https://github.com/jpcamara please check out this PR: #279 https://github.com/tilo/smarter_csv/pull/279
— Reply to this email directly, view it on GitHub https://github.com/tilo/smarter_csv/pull/277#issuecomment-2205970419, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAACVTQMEXDXDO2NHOPLETLZKPVRZAVCNFSM6AAAAABFYK6RT2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDEMBVHE3TANBRHE . You are receiving this because you were mentioned.Message ID: @.***>
FYI: The fix will be released soon from this PR
Closing in favor of this fix: https://github.com/tilo/smarter_csv/pull/279
Thanks @tilo !