dd-trace-rb
dd-trace-rb copied to clipboard
[APPSEC-8458] introduce AppSec::Utils::ReadWriteLock
What does this PR do?
Introduce a native ruby ReadWrite Lock implementation. This RWLock would be used in subsequent work for the Remote Client Configuration.
You can check this PR to understand where we will use the read-write lock.
The implementation provides a fair reader write lock.
It allows for as many concurrent readers but only one concurrent writer. To avoid the writer thread being exhausted, once a writer thread has signalled that it wants to acquire the lock, no more reader threads can acquire the read lock. The new reader threads will wait for the writer thread to finish the writing.
Motivation
The motivation is to reduce the contention when reading or writing the AppSec.processor
singleton variable.
There will be a background thread that might modify the processor instance. During that process, we want any request to wait until the background thread has finished updating the processor instance.
In this case, we do not have any background thread modifying the processor instance. We want as many concurrent read operations as possible.
Additional Notes
I provided a simple unit test for the implementation, and I also tested against this script:
require 'benchmark'
TOTAL_THREADS = 100 # set as high as practicable, for a better test
def test(lock)
puts "READ INTENSIVE (80% read, 20% write):"
single_test(lock, (TOTAL_THREADS * 0.8).floor, (TOTAL_THREADS * 0.2).floor)
puts "WRITE INTENSIVE (80% write, 20% read):"
single_test(lock, (TOTAL_THREADS * 0.2).floor, (TOTAL_THREADS * 0.8).floor)
puts "BALANCED (50% read, 50% write):"
single_test(lock, (TOTAL_THREADS * 0.5).floor, (TOTAL_THREADS * 0.5).floor)
end
def single_test(lock, n_readers, n_writers, reader_iterations=50, writer_iterations=50, reader_sleep=0.001, writer_sleep=0.001)
puts "Testing #{lock.class} with #{n_readers} readers and #{n_writers} writers. Readers iterate #{reader_iterations} times, sleeping #{reader_sleep}s each time, writers iterate #{writer_iterations} times, sleeping #{writer_sleep}s each time"
mutex = Mutex.new
bad = false
data = 0
result = Benchmark.measure do
readers = n_readers.times.collect do |i|
Thread.new do
reader_iterations.times do
lock.with_read_lock do
mutex.synchronize { bad = true } if (data % 2) != 0
sleep(reader_sleep)
mutex.synchronize { bad = true } if (data % 2) != 0
end
end
end
end
writers = n_writers.times.collect do |i|
Thread.new do
writer_iterations.times do
lock.with_write_lock do
value = (data += 1)
sleep(writer_sleep)
data = value+1
end
end
end
end
readers.each { |t| t.join }
writers.each { |t| t.join }
puts "BAD!!! Readers+writers overlapped!" if mutex.synchronize { bad }
puts "BAD!!! Writers overlapped! Expected data #{(n_writers * writer_iterations * 2)} got #{data}" if data != (n_writers * writer_iterations * 2)
puts "Expected data #{(n_writers * writer_iterations * 2)} got #{data}"
end
puts result
end
test(Datadog::AppSec::Utils::ReadWriteLock.new)
This is the results from running it:
READ INTENSIVE (80% read, 20% write):
Testing Locks::ReadWriteMutex with 80 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
Expected data 2000 got 2000
0.028927 0.095037 0.123964 ( 1.388397)
WRITE INTENSIVE (80% write, 20% read):
Testing Locks::ReadWriteMutex with 20 readers and 80 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
Expected data 8000 got 8000
0.089277 0.143242 0.232519 ( 5.363435)
BALANCED (50% read, 50% write):
Testing Locks::ReadWriteMutex with 50 readers and 50 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
Expected data 5000 got 5000
0.066041 0.151607 0.217648 ( 3.345871)
How to test the change?
CI
Codecov Report
Merging #2717 (dce785c) into master (3c4c4ba) will decrease coverage by
0.01%
. The diff coverage is100.00%
.
@@ Coverage Diff @@
## master #2717 +/- ##
==========================================
- Coverage 98.06% 98.06% -0.01%
==========================================
Files 1210 1212 +2
Lines 66456 66567 +111
Branches 2972 2975 +3
==========================================
+ Hits 65172 65280 +108
- Misses 1284 1287 +3
Impacted Files | Coverage Δ | |
---|---|---|
lib/datadog/appsec/utils/read_write_lock.rb | 100.00% <100.00%> (ø) |
|
spec/datadog/appsec/utils/read_write_lock_spec.rb | 100.00% <100.00%> (ø) |
... and 3 files with indirect coverage changes
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
@lloeki and I created a simple test app to get some results of using a single mutex or the RWLock implementation from this PR.
The results show improved throughput and wait time when using RWLock over a single mutex. Still, the improvement is different from orders of magnitude that wouldn't force us to start using the RWLock implementation in production. We will begin using a single mutex, see the results on our reliability environment, and if there is a lot of degradation, we will revisit this topic.
Here are the code and results:
App:
# config.ru
require 'pry'
require 'logger'
require 'digest'
require 'securerandom'
require 'json'
load 'rwmutex.rb'
logger = Logger.new(STDOUT, Logger::DEBUG)
mode = ENV['MODE']
read_work = (ENV['READ_WORK'] || 0.001).to_f
write_work = (ENV['WRITE_WORK'] || 0.01).to_f
req_work = (ENV['REQ_WORK'] || 0.10).to_f
calibration_filename = ENV['CALIBRATION'] || 'calibration.json'
interval = (ENV['INTERVAL'] || 1).to_f
mutex = case mode
when 'mutex'
Mutex.new
when 'rwmutex'
ReadWriteLock.new
end
def work(times)
times.times do
Digest::SHA256.hexdigest(SecureRandom.uuid)
end
end
if File.exist?(calibration_filename)
data = JSON.parse(File.read(calibration_filename))
$rate = data['rate']
logger.debug { "#{Thread.current}: calibrated: #{$rate}/s" }
else
logger.debug { "#{Thread.current}: calibrating" }
times = 100_000
start = Time.now
work(times)
end_ = Time.now
duration = end_ - start
$rate = times.to_f / duration
File.write(calibration_filename, JSON.dump({ rate: $rate }))
logger.debug { "#{Thread.current}: calibrated: #{$rate}/s" }
end
def work_for(duration)
work(($rate * duration).to_i)
end
app = proc do |env|
case mode
when 'mutex'
mutex.synchronize do
work_for(read_work)
end
when 'rwmutex'
mutex.with_rlock do
work_for(read_work)
end
else
work_for(read_work)
end
request = Rack::Request.new(env)
if request.path == '/'
work_for(req_work)
[ 200, {'Content-Type' => 'text/plain'}, ['Hello, wat is love?'] ]
else
[ 404, {'Content-Type' => 'text/plain'}, ['not found'] ]
end
end
Thread.new do
loop do
logger.debug { "#{Thread.current}: working" }
# work
case mode
when 'mutex'
mutex.synchronize do
work_for(write_work)
end
when 'rwmutex'
mutex.with_lock do
work_for(write_work)
end
else
work_for(write_work)
end
# wait
logger.debug { "#{Thread.current}: sleeping" }
sleep interval
end
end
run app
Here are the results using vegeta
env MODE=none READ_WORK=0.001 WRITE_WORK=0.01 REQ_WORK=0.1 bundle exec puma -w 0 -t 10 -p 9292
echo "GET http://localhost:9292/" | vegeta attack -duration=60s -rate=0 -max-workers=20 | tee results.bin | vegeta report
Requests [total, rate, throughput] 621, 10.35, 10.05
Duration [total, attack, wait] 1m2s, 1m0s, 1.767s
Latencies [min, mean, 50, 90, 95, 99, max] 97.991ms, 1.961s, 913.402ms, 2.663s, 12.822s, 14.512s, 17.544s
Bytes In [total, mean] 11799, 19.00
Bytes Out [total, mean] 0, 0.00
Success [ratio] 100.00%
Status Codes [code:count] 200:621
Error Set:
env MODE=mutex READ_WORK=0.001 WRITE_WORK=0.01 REQ_WORK=0.1 bundle exec puma -w 0 -t 10 -p 9292
echo "GET http://localhost:9292/" | vegeta attack -duration=60s -rate=0 -max-workers=20 | tee results.bin | vegeta report
Requests [total, rate, throughput] 623, 10.37, 10.03
Duration [total, attack, wait] 1m2s, 1m0s, 2.057s
Latencies [min, mean, 50, 90, 95, 99, max] 100.14ms, 1.963s, 930.49ms, 3.547s, 12.218s, 14.262s, 19.494s
Bytes In [total, mean] 11837, 19.00
Bytes Out [total, mean] 0, 0.00
Success [ratio] 100.00%
Status Codes [code:count] 200:623
Error Set:
env MODE=rwmutex READ_WORK=0.001 WRITE_WORK=0.01 REQ_WORK=0.1 bundle exec puma -w 0 -t 10 -p 9292
echo "GET http://localhost:9292/" | vegeta attack -duration=60s -rate=0 -max-workers=20 | tee results.bin | vegeta report
Requests [total, rate, throughput] 607, 10.11, 9.80
Duration [total, attack, wait] 1m2s, 1m0s, 1.908s
Latencies [min, mean, 50, 90, 95, 99, max] 98.565ms, 2.011s, 936.244ms, 2.822s, 12.596s, 14.772s, 19.719s
Bytes In [total, mean] 11533, 19.00
Bytes Out [total, mean] 0, 0.00
Success [ratio] 100.00%
Status Codes [code:count] 200:607
Error Set:
We are going. to leave the PR open until we fully agree on the implementation we are going to use in production
Given that we ended up not using it, shall we go ahead and close this PR for now?
I'm going to go ahead and close this one, since we ended up not using it for now :)