codeql
codeql copied to clipboard
Python: Why global dataflow not tracking `endpoints` in function `Service.start()`?
Python code from openstack
from neutron_lib._i18n import _
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging import serializer as om_serializer
from oslo_service import service
LOG = logging.getLogger(__name__)
TRANSPORT = None
class RequestContextSerializer(om_serializer.Serializer):
def __init__(self, base=None):
pass
pass
def get_server(target, endpoints, serializer=None):
"""Get a new RPC server reference.
:param target: The target for the new RPC server.
:param endpoints: The endpoints for the RPC server.
:param serializer: The optional serialize to use for the RPC server.
:returns: A new RPC server reference.
"""
if TRANSPORT is None:
raise AssertionError(_("'TRANSPORT' must not be None"))
serializer = RequestContextSerializer(serializer)
return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
'eventlet', serializer)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None, serializer=None):
super().__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super().start()
self.conn = Connection()
LOG.debug("Creating Consumer connection for Service %s",
self.topic)
endpoints = [self.manager]
self.conn.create_consumer(self.topic, endpoints)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in threads
self.conn.consume_in_threads()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception: # nosec
pass
super().stop()
class Connection(object):
"""A utility class that manages a collection of RPC servers."""
def __init__(self):
super().__init__()
self.servers = []
def create_consumer(self, topic, endpoints, fanout=False):
target = oslo_messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = get_server(target, endpoints)
self.servers.append(server)
def consume_in_threads(self):
for server in self.servers:
server.start()
return self.servers
def close(self):
for server in self.servers:
server.stop()
for server in self.servers:
server.wait()
CodeQL
import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
module MyConf implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
source.asExpr().isConstant() or
source.asExpr() instanceof Name or
source.asExpr() instanceof Attribute
}
predicate isSink(DataFlow::Node sink) {
sink = API::moduleImport("oslo_messaging").getMember("get_rpc_server").getACall().getArg(2)
}
predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo){
nodeTo.asCfgNode().(IterableNode).getAnElement() = nodeFrom.asCfgNode()
}
}
module MyFlow = DataFlow::Global<MyConf>;
from DataFlow::Node source, DataFlow::Node sink
where MyFlow::flow(source, sink)
select source, sink
Output
| source | sink |
|---|---|
endpoints in line 19 |
endpoints in line 30 |
endpoints in line 30 |
endpoints in line 30 |
endpoints in line 84 |
endpoints in line 30 |
endpoints in line 87 |
endpoints in line 30 |
It may be caused by Issue#17021.
Have you tried evaluation the isSource predicate using the "quick eval" feature of the CodeQL for VScode extension? If that doesn't include endpoints = [self.manager] then there is something wrong with the isSource predicate. If it is included then most likely some flow step is missing, possibly due to a failure to resolve a method. A trick you can use to debug is to implement isSource to only select an expression in endpoints = [self.manager] and define predicate isSink() { any() }. That should allow you to explore all the dataflow going out of that single source and determine where flow gets "stuck".
A trick you can use to debug is to implement isSource to only select an expression in endpoints = [self.manager] and define predicate isSink() { any() }.
The flow stops at endpoints in self.conn.create_consumer(self.topic, endpoints).
| source | sink |
|---|---|
[self.manager] |
[self.manager] |
[self.manager] |
endpoints in endpoints = [self.manager] |
[self.manager] |
endpoints in self.conn.create_consumer(self.topic, endpoints) |
So, is there any plan to fix the bug?
Hi @hksdpc255,
Sorry for the late reply. We have identified an incomplete call graph as the reason why the taint is not propagating as intended. Our Python language team is looking into the issue. I will provide an update when we know more.