codeql icon indicating copy to clipboard operation
codeql copied to clipboard

Python: Why global dataflow not tracking `endpoints` in function `Service.start()`?

Open hksdpc255 opened this issue 1 year ago • 5 comments
trafficstars

Python code from openstack

from neutron_lib._i18n import _
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging import serializer as om_serializer
from oslo_service import service


LOG = logging.getLogger(__name__)
TRANSPORT = None


class RequestContextSerializer(om_serializer.Serializer):
    def __init__(self, base=None):
        pass

    pass

def get_server(target, endpoints, serializer=None):
    """Get a new RPC server reference.

    :param target: The target for the new RPC server.
    :param endpoints: The endpoints for the RPC server.
    :param serializer: The optional serialize to use for the RPC server.
    :returns: A new RPC server reference.
    """
    if TRANSPORT is None:
        raise AssertionError(_("'TRANSPORT' must not be None"))
    serializer = RequestContextSerializer(serializer)
    return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
                                         'eventlet', serializer)

class Service(service.Service):
    """Service object for binaries running on hosts.

    A service enables rpc by listening to queues based on topic and host.
    """
    def __init__(self, host, topic, manager=None, serializer=None):
        super().__init__()
        self.host = host
        self.topic = topic
        self.serializer = serializer
        if manager is None:
            self.manager = self
        else:
            self.manager = manager

    def start(self):
        super().start()

        self.conn = Connection()
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

    def stop(self):
        # Try to shut the connection down, but if we get any sort of
        # errors, go ahead and ignore them.. as we're shutting down anyway
        try:
            self.conn.close()
        except Exception:  # nosec
            pass
        super().stop()


class Connection(object):
    """A utility class that manages a collection of RPC servers."""

    def __init__(self):
        super().__init__()
        self.servers = []

    def create_consumer(self, topic, endpoints, fanout=False):
        target = oslo_messaging.Target(
            topic=topic, server=cfg.CONF.host, fanout=fanout)
        server = get_server(target, endpoints)
        self.servers.append(server)

    def consume_in_threads(self):
        for server in self.servers:
            server.start()
        return self.servers

    def close(self):
        for server in self.servers:
            server.stop()
        for server in self.servers:
            server.wait()

CodeQL

import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking

module MyConf implements DataFlow::ConfigSig {
    predicate isSource(DataFlow::Node source) {
        source.asExpr().isConstant() or
        source.asExpr() instanceof Name or
        source.asExpr() instanceof Attribute
    }
    predicate isSink(DataFlow::Node sink) {
        sink = API::moduleImport("oslo_messaging").getMember("get_rpc_server").getACall().getArg(2)
    }
    predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo){
        nodeTo.asCfgNode().(IterableNode).getAnElement() = nodeFrom.asCfgNode()
    }
}

module MyFlow = DataFlow::Global<MyConf>;

from DataFlow::Node source, DataFlow::Node sink
where MyFlow::flow(source, sink)
select source, sink

Output

source sink
endpoints in line 19 endpoints in line 30
endpoints in line 30 endpoints in line 30
endpoints in line 84 endpoints in line 30
endpoints in line 87 endpoints in line 30

hksdpc255 avatar Jul 19 '24 09:07 hksdpc255

It may be caused by Issue#17021.

hksdpc255 avatar Jul 19 '24 10:07 hksdpc255

Have you tried evaluation the isSource predicate using the "quick eval" feature of the CodeQL for VScode extension? If that doesn't include endpoints = [self.manager] then there is something wrong with the isSource predicate. If it is included then most likely some flow step is missing, possibly due to a failure to resolve a method. A trick you can use to debug is to implement isSource to only select an expression in endpoints = [self.manager] and define predicate isSink() { any() }. That should allow you to explore all the dataflow going out of that single source and determine where flow gets "stuck".

aibaars avatar Jul 19 '24 15:07 aibaars

A trick you can use to debug is to implement isSource to only select an expression in endpoints = [self.manager] and define predicate isSink() { any() }. The flow stops at endpoints in self.conn.create_consumer(self.topic, endpoints).

source sink
[self.manager] [self.manager]
[self.manager] endpoints in endpoints = [self.manager]
[self.manager] endpoints in self.conn.create_consumer(self.topic, endpoints)

hksdpc255 avatar Jul 22 '24 06:07 hksdpc255

So, is there any plan to fix the bug?

hksdpc255 avatar Jul 23 '24 05:07 hksdpc255

Hi @hksdpc255,

Sorry for the late reply. We have identified an incomplete call graph as the reason why the taint is not propagating as intended. Our Python language team is looking into the issue. I will provide an update when we know more.

rvermeulen avatar Oct 15 '24 15:10 rvermeulen