fsf icon indicating copy to clipboard operation
fsf copied to clipboard

Decision: Post Processor Efficiency

Open akniffe1 opened this issue 7 years ago • 0 comments

The post processor is awesome, but tends to be underutilized. This is a bit of a blessing as it has a potential scaling limitation, and its error handling could use some extension as well.

For each post processor signature we pipe out a json.dumps on the scan_report to the JQ interpreter, meaning that we're looping through the sigs performing 1 at a time. In addition to this, we also abandon all post processing steps if there's a problem on a single signature attempt--meaning that poorly designed JQ sigs could limit the effectiveness of all other sigs with little logging to indicate why. We could treat the JQ sigs like yara sigs where we attempt to compile the sigs (in this case a simple test of running them through the JQ interpreter) before initializing the fsf-server. We can also return the exit code from popen so there's some manner of audit trail for failed things.

Fixes:

  • test JQ sigs before initializing the daemon
  • dump the scan report to json just once, and update post processor documentation to indicate that there's no post processing of post processor results (inception?). Alternatively, if there is a positive JQ response then we can update the scan_report and then json.dumps
  • switch from the default json library to simplejson for this function as there's a well documented 50-60% performance gain in it over the builtin json library.

See below:

def post_processor(s, report):

   observations = []

   jq_location = find_executable('jq')
   if jq_location == None:
      s.dbg_h.error('%s Unable to find JQ, aborting post-processing routine...' % dt.now())
      return

   for script, observation, alert in disposition.post_processor:
      args = [jq_location, '-f', '%s/%s/%s' % (os.path.dirname(os.path.realpath(__file__)), 'jq', script)]
      proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
      results = proc.communicate(input=json.dumps(report))[0].split('\n')

      if proc.returncode:
         s.dbg_h.error('%s There was a problem executing the JSON interpreter...' % dt.now())
         return

      for r in results:
         if r == 'true':
            observations.append(observation)
            # Allow ourselves to alert on certain observations
            if alert:
               s.alert = True

akniffe1 avatar Apr 08 '17 01:04 akniffe1