fluentd icon indicating copy to clipboard operation
fluentd copied to clipboard

filter_parser: Transition from single record to stream

Open Athishpranav2003 opened this issue 1 year ago • 24 comments

Which issue(s) this PR fixes: Fixes #4100

What this PR does / why we need it: Transition filter_parser from using filter_with_time to filter_stream logic Docs Changes: N/A Release Note: N/A

Athishpranav2003 avatar Aug 29 '24 06:08 Athishpranav2003

@daipom i have raised this as a draft. This needs other changes like changing the tests around this, exception handling and etc.

For now lets have discussion on the base logic and in meantime i will add them once we fix it

Athishpranav2003 avatar Aug 29 '24 06:08 Athishpranav2003

Thanks! I will see this this weekend!

daipom avatar Aug 29 '24 08:08 daipom

@daipom thank you. The tests needs to be revamped for filter parser since the tests involve few instance variables. Do check it in freetime and let me know

Athishpranav2003 avatar Aug 29 '24 08:08 Athishpranav2003

@daipom any opinion on this logic? Should we go ahead with this?

Athishpranav2003 avatar Sep 02 '24 12:09 Athishpranav2003

@Athishpranav2003 Sorry for my late response. I got COVID-19 last weekend and only got a little work done this week :cry: I will see this now, please wait a few more days :cry:

daipom avatar Sep 06 '24 09:09 daipom

It appears that adding some inner methods would improve code readability. For example ...

I guess we need to think a little better about handling error events...

daipom avatar Sep 09 '24 04:09 daipom

@daipom yah I wanted to make sure we are in same page. I will refactor the code and change the error part after that

Athishpranav2003 avatar Sep 09 '24 05:09 Athishpranav2003

@daipom yah I wanted to make sure we are in same page. I will refactor the code and change the error part after that

Thanks! This would be the right direction!

daipom avatar Sep 09 '24 05:09 daipom

@daipom you can check this now

Athishpranav2003 avatar Sep 10 '24 06:09 Athishpranav2003

@daipom I guess the CI test fail is not related to this change

Athishpranav2003 avatar Sep 10 '24 07:09 Athishpranav2003

Thanks!! I will see this!

I guess the CI test fail is not related to this change

Yes, They are not related.

daipom avatar Sep 10 '24 07:09 daipom

@daipom https://github.com/fluent/fluentd/blob/51b860b1be2eb59076d706fda12d55657a614c8f/test/plugin/test_filter_parser.rb#L659-L666 could you please point which emit_event is this call part of

Athishpranav2003 avatar Sep 12 '24 06:09 Athishpranav2003

After doing some verification https://github.com/fluent/fluentd/blob/51b860b1be2eb59076d706fda12d55657a614c8f/lib/fluent/plugin/filter.rb#L82-L104

The emit error event is in line 90/99. This implies that earlier we raise the exception in filter one record and capture in the filter stream. Now since we have implemented stream directly so the raise not needed

Failure: test_call_emit_error_event_when_parser_error(ParserFilterTest):
  in mock 'flexmock(Fluent::Test::Driver::TestEventRouter)': no matching handler found for emit_error_event("test", 1272942121, {"data"=>"{\"time\":[], \"f1\":\"v1\"}"}, #<Fluent::Plugin::Parser::ParserError: value must be a string or a number: [](Array)>)
  Defined expectations:
    should_receive(:emit_error_event).with("String", Integer, Hash, Fluent::Plugin::Parser::ParserError).once.
  <false> is not true.
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/test_unit_integration.rb:57:in `make_assertion'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/test_unit_integration.rb:65:in `check'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core_class_methods.rb:104:in `check'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/expectation_director.rb:41:in `call'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core.rb:150:in `block in method_missing'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core.rb:289:in `flexmock_wrap'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core.rb:146:in `method_missing'
(eval at /home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/partial_mock.rb:429):3:in `emit_error_event'
/home/aggressive_racer1/projects/fluentd/lib/fluent/plugin/filter.rb:90:in `rescue in block in filter_stream'
/home/aggressive_racer1/projects/fluentd/lib/fluent/plugin/filter.rb:86:in `block in filter_stream'
/home/aggressive_racer1/projects/fluentd/lib/fluent/event.rb:110:in `each'
/home/aggressive_racer1/projects/fluentd/lib/fluent/plugin/filter.rb:85:in `filter_stream'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/filter.rb:49:in `block (2 levels) in instance_hook_after_started'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/event_feeder.rb:43:in `feed_to_plugin'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/event_feeder.rb:87:in `feed'
/home/aggressive_racer1/projects/fluentd/test/plugin/test_filter_parser.rb:664:in `block in test_call_emit_error_event_when_parser_error'
     661:     flexmock(d.instance.router).should_receive(:emit_error_event).
     662:       with("String", Integer, Hash, ParserError).once
     663:     d.run do
  => 664:       d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'})
     665:     end
     666:   end
     667: 
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base.rb:204:in `block in run_actual'
/usr/share/ruby/timeout.rb:186:in `block in timeout'
/usr/share/ruby/timeout.rb:41:in `handle_timeout'
/usr/share/ruby/timeout.rb:195:in `timeout'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base.rb:203:in `run_actual'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base.rb:96:in `run'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base_owner.rb:130:in `run'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/event_feeder.rb:39:in `run'
/home/aggressive_racer1/projects/fluentd/test/plugin/test_filter_parser.rb:663:in `test_call_emit_error_event_when_parser_error'

I used the following jugad way to find the function call

Athishpranav2003 avatar Sep 12 '24 06:09 Athishpranav2003

Now since we have implemented stream directly so the raise not needed

Yes, we don't need raise in the new implementation. However, router.emit_error_event should be called as before.

daipom avatar Sep 12 '24 07:09 daipom

https://github.com/fluent/fluentd/blob/51b860b1be2eb59076d706fda12d55657a614c8f/test/plugin/test_filter_parser.rb#L659-L666

could you please point which emit_event is this call part of

In the current implementation:

https://github.com/fluent/fluentd/blob/78a7972bfe6b421f08472701f04f00515ed24bee/lib/fluent/plugin/filter_parser.rb#L114-L119

In the new implementation:

https://github.com/fluent/fluentd/blob/9ec5a95c964cb2eb6340b7538e25c8547d4bbaec/lib/fluent/plugin/filter_parser.rb#L105-L108

This change is OK because emit_error_event is called similarly after all.

daipom avatar Sep 12 '24 07:09 daipom

@daipom i am not sure what is this issue

Failure: test_filter_key_not_exist(ParserFilterTest):
  in mock 'flexmock(Fluent::Test::Driver::TestEventRouter)': no matching handler found for emit_error_event("test", 1272942121, {"foo"=>"bar"}, #<ArgumentError: data does not exist>)
  Defined expectations:
    should_receive(:emit_error_event).with(String, Integer, Hash, #<ArgumentError: data does not exist>).once.
  <false> is not true.

For now i have pushed the alternative code. Now the functionality looks visibly same as before. Could you please let me know which part is still ambigious and needs tests?

Athishpranav2003 avatar Sep 12 '24 07:09 Athishpranav2003

@daipom i am not sure what is this issue

Failure: test_filter_key_not_exist(ParserFilterTest):
  in mock 'flexmock(Fluent::Test::Driver::TestEventRouter)': no matching handler found for emit_error_event("test", 1272942121, {"foo"=>"bar"}, #<ArgumentError: data does not exist>)
  Defined expectations:
    should_receive(:emit_error_event).with(String, Integer, Hash, #<ArgumentError: data does not exist>).once.
  <false> is not true.

I see. This test code specifies the content of expected arguments too strictly. It appears that this requires all values of the instance match, but the stacktrace data can be different. We should soften the conditions.

diff --git a/test/plugin/test_filter_parser.rb b/test/plugin/test_filter_parser.rb
index ec36c836..4201cec8 100644
--- a/test/plugin/test_filter_parser.rb
+++ b/test/plugin/test_filter_parser.rb
@@ -650,7 +650,7 @@ class ParserFilterTest < Test::Unit::TestCase
   def test_filter_key_not_exist
     d = create_driver(CONFIG_NOT_IGNORE)
     flexmock(d.instance.router).should_receive(:emit_error_event).
-      with(String, Integer, Hash, ArgumentError.new("data does not exist")).once
+      with(String, Integer, Hash, ArgumentError).once
     assert_nothing_raised {
       d.run do
         d.feed(@tag, Fluent::EventTime.now.to_i, {'foo' => 'bar'})

daipom avatar Sep 12 '24 09:09 daipom

Got it @daipom Will make the changes in sometime

Athishpranav2003 avatar Sep 12 '24 10:09 Athishpranav2003

Thanks!

daipom avatar Sep 12 '24 10:09 daipom

@daipom i have addressed the comments. Additionally i felt there was a redundant rescue in the prev code so i have removed that as well. Is it fine now?

Athishpranav2003 avatar Sep 12 '24 11:09 Athishpranav2003

I made another PR to add some test cases for abnormal cases.

  • #4638

Let's rebase after it is merged.

I confirmed the test results on my local for this PR. Basically, there is no problem, but a few tests fail. It is because this PR removes the following rescue, and it changes the error message a little bit.

https://github.com/fluent/fluentd/blob/51b860b1be2eb59076d706fda12d55657a614c8f/lib/fluent/plugin/filter_parser.rb#L114-L119

With this, I believe we should stop removing this rescue, even if this is redundant. It is preferable to keep the error message as unchanged as possible because it could cause a disadvantage to existing users.

The result of #4638 tests applied to this PR:

Failure: test_parser_error[invalid format with default](ParserFilterTest::abnormal cases)
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:546:in `run_and_assert'
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:691:in `test_parser_error'
     688:         </parse>
     689:       EOC
     690: 
  => 691:       run_and_assert(driver, **data.except(:additional_config))
     692:     end
     693: 
     694:     data(
<[[],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "value must be a string or a number: [](Array)"]]]> expected but was
<[[],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "parse failed value must be a string or a number: [](Array)"]]]>

diff:
  [[],
   [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
   [[Fluent::Plugin::Parser::ParserError,
?    "parse failed value must be a string or a number: [](Array)"]]]
======================================================================================================
F
======================================================================================================
Failure: test_parser_error[mixed valid and invalid with default](ParserFilterTest::abnormal cases)
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:546:in `run_and_assert'
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:691:in `test_parser_error'
     688:         </parse>
     689:       EOC
     690: 
  => 691:       run_and_assert(driver, **data.except(:additional_config))
     692:     end
     693: 
     694:     data(
<[[{"f1"=>"v1"}],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "value must be a string or a number: [](Array)"]]]> expected but was
<[[{"f1"=>"v1"}],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "parse failed value must be a string or a number: [](Array)"]]]>

diff:
  [[{"f1"=>"v1"}],
   [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
   [[Fluent::Plugin::Parser::ParserError,
?    "parse failed value must be a string or a number: [](Array)"]]]

daipom avatar Sep 16 '24 15:09 daipom

@daipom thanks for this help. I had difficulty in figuring out the edgecases. Thanks for doing it along with this to make it move ahead

Athishpranav2003 avatar Sep 16 '24 17:09 Athishpranav2003

@daipom i have reverted the change with redundant rescue. Can you point out the change in the error messages if its still present in this pr?

Athishpranav2003 avatar Sep 18 '24 03:09 Athishpranav2003

@ashie @kenhys Which version should we release this on?

This solves the limitation of #4474, which is released on v1.17.0. It's ambiguous whether it's a bug fix or an enhancement.

I think it is possible to release this on v1.17. On the other hand, since this is a large fix for filter_parser, 1.18 may be safer, given the risk of regression.

daipom avatar Sep 18 '24 04:09 daipom

On the other hand, since this is a large fix for filter_parser, 1.18 may be safer, given the risk of regression.

I'll second for v1.18.

kenhys avatar Nov 21 '24 06:11 kenhys

Thanks! I have confirmed this change passes tests on #4638. I believe we are now ready to merge! (after #4638) Thanks so much for this fix!

Now #4638 is merged. So, I rebased this. After checking CI, I merge this. Thanks!

daipom avatar Nov 27 '24 02:11 daipom