caringcaribou
caringcaribou copied to clipboard
Add standalone mock ECU module
It would be nice to be able to simply start a mock ECU directly through a separate module for testing, e.g. ./cc.py -i vcan0 mock uds
to start a UDS mock ECU on interface vcan0
. This could prove useful when playing around, learning new protocols or developing new modules.
Hello, I'm a contributor of Scapy and working on automotive related features for it. I've implemented a mechanism to simulate the UDS behaviour of an ECU in Scapy. Most of my work is documented here: https://scapy.readthedocs.io/en/latest/layers/automotive.html
How to simulate an ECU can be seen in some of my unit tests. A list of supported responses can be defined https://github.com/secdev/scapy/blob/876d9b86f96df6df532a50303f76fad05d2a0539/test/contrib/automotive/ecu_am.uts#L288-L298 and forwarded to a ECU_am
(ECU answering machine) object https://github.com/secdev/scapy/blob/876d9b86f96df6df532a50303f76fad05d2a0539/test/contrib/automotive/ecu_am.uts#L302-L303
I would be happy, if you found some of my work useful.
Cool! Thanks a lot @polybassa - I'm sure this will indeed prove useful :+1:
Thanks for your answer. I would be glad, if you can give me feedback, once you used Scapy.
Did someone manage to get the mock ECU working? I get a name error when I try to reference the ECU()
object. Is this issue known? This is the bare minimum code that I was trying to run using python:
from scapy.all import *
load_contrib("isotp")
load_contrib("automotive.uds")
load_layer("can")
load_contrib("automotive.ecu")
ecu = ECU() #NameError: name 'ECU' is not defined
assert ecu.current_session == 1
assert ecu.current_security_level == 0
assert ecu.communication_control == 0
The end result, I want to have a mock ECU with some UDS services available. This mock ECU, I want to scan for the UDS services using CaringCaribou on a virtual CAN interface. Is this possible? How should I proceed with this one?
Hi, Thanks for your question. Could you provide a full debug output. I can't explain myself, why this is not working.
load_layer("can")
conf.contribs['CAN']['swap-bytes'] = False
import os, threading, six, subprocess, sys
from subprocess import call
from scapy.consts import LINUX
iface0 = "vcan0"
iface1 = "vcan1"
ISOTP_KERNEL_MODULE_AVAILABLE = False
def exit_if_no_isotp_module():
if not ISOTP_KERNEL_MODULE_AVAILABLE:
sys.stderr.write("TEST SKIPPED: can-isotp not available" + os.linesep)
warning("Can't test ISOTP native socket because kernel module is not loaded")
exit(0)
if 0 != call(["cansend", iface0, "000#"]):
# vcan0 is not enabled
if 0 != call(["sudo", "modprobe", "vcan"]):
raise Exception("modprobe vcan failed")
if 0 != call(["sudo", "ip", "link", "add", "name", iface0, "type", "vcan"]):
print("add %s failed: Maybe it was already up?" % iface0)
if 0 != call(["sudo", "ip", "link", "set", "dev", iface0, "up"]):
raise Exception("could not bring up %s" % iface0)
if 0 != call(["cansend", iface0, "000#"]):
raise Exception("cansend doesn't work")
if 0 != call(["cansend", iface1, "000#"]):
# vcan1 is not enabled
if 0 != call(["sudo", "modprobe", "vcan"]):
raise Exception("modprobe vcan failed")
if 0 != call(["sudo", "ip", "link", "add", "name", iface1, "type", "vcan"]):
print("add %s failed: Maybe it was already up?" % iface1)
if 0 != call(["sudo", "ip", "link", "set", "dev", iface1, "up"]):
raise Exception("could not bring up %s" % iface1)
if 0 != call(["cansend", iface1, "000#"]):
raise Exception("cansend doesn't work")
print("CAN should work now")
if six.PY3 and LINUX:
from scapy.contrib.cansocket_native import *
new_can_socket = lambda iface: CANSocket(iface)
new_can_socket0 = lambda: CANSocket(iface0)
new_can_socket1 = lambda: CANSocket(iface1)
s = new_can_socket(iface0)
s.close()
p1 = subprocess.Popen(['lsmod'], stdout = subprocess.PIPE)
p2 = subprocess.Popen(['grep', '^can_isotp'], stdout = subprocess.PIPE, stdin=p1.stdout)
p1.stdout.close()
if p1.wait() == 0 and p2.wait() == 0 and b"can_isotp" in p2.stdout.read():
p = subprocess.Popen(["isotpsend", "-s1", "-d0", iface0], stdin = subprocess.PIPE)
p.communicate(b"01")
if p.returncode == 0:
ISOTP_KERNEL_MODULE_AVAILABLE = True
conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': ISOTP_KERNEL_MODULE_AVAILABLE}
load_contrib("isotp")
from scapy.contrib.isotp import ISOTPNativeSocket
ISOTPSocket = ISOTPNativeSocket
assert ISOTPSocket == ISOTPNativeSocket
load_contrib('automotive.uds')
load_contrib('automotive.ecu')
example_responses = \
[ECUResponse(session=2, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
ECUResponse(session=range(3,5), security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")),
ECUResponse(session=[5,6,7], security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")),
ECUResponse(session=lambda x: 8 < x <= 10, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4"))]
success = False
with new_can_socket0() as isocan1, ISOTPSocket(isocan1, sid=0x700, did=0x600, basecls=UDS) as ecu, \
new_can_socket0() as isocan2, ISOTPSocket(isocan2, sid=0x600, did=0x700, basecls=UDS) as tester:
answering_machine = ECU_am(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
sim.start()
# Simulator is running for 60s, see timeout parameter
time.sleep(60)
# Kill SIMULATOR
tester.send(UDS(service=0xff))
sim.join(timeout=10)
The ECU simulator is parametrized through a list of ECUResponses. The order of this list is important. As soon as the answering machine finds a response that fits to a request, the response is sent. This means, a more detailed response has to be on top of a general response.
This code shows two ISOTPSockets, one acts as ECU socket, the other as Tester socket. You can also create your own Tester socket and send commands via caringcaribou. The kwargs of the ECU_am object are important for the runtime behavior. You can specify timeout
, count
or stop_filter
to determine, when the Simulator should be killed.
Here is some code from a unit test file, I'm using: https://github.com/secdev/scapy/blob/master/test/contrib/automotive/ecu_am.uts
The code in the beginning is to setup the test system. This should run without problems on a system with Linux, python3 and Hartkopps ISOTP Kernel modul loaded (check lsmod
)
I gave another try to use automotive.ecu
from the scapy interface directly and this is the behaviour:
>>> load_contrib('automotive.uds')
INFO: Specify "conf.contribs['UDS'] = {'treat-response-pending-as-answer': True}" to treat a negative response 'requestCorrectlyReceived-ResponsePending' as answer of a request.
The default value is False.
>>> load_contrib('automotive.ccp')
>>> load_contrib('automotive.ecu')
ERROR: Loading module scapy.layers.automotive.ecu
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/scapy/main.py", line 188, in load_contrib
importlib.import_module("scapy.contrib." + name)
File "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 994, in _gcd_import
File "<frozen importlib._bootstrap>", line 971, in _find_and_load
File "<frozen importlib._bootstrap>", line 953, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'scapy.contrib.automotive.ecu'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/scapy/main.py", line 143, in _load
mod = importlib.import_module(module)
File "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 994, in _gcd_import
File "<frozen importlib._bootstrap>", line 971, in _find_and_load
File "<frozen importlib._bootstrap>", line 941, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 994, in _gcd_import
File "<frozen importlib._bootstrap>", line 971, in _find_and_load
File "<frozen importlib._bootstrap>", line 953, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'scapy.layers.automotive'
>>>
Hi, Thanks for your question. Could you provide a full debug output. I can't explain myself, why this is not working.
load_layer("can") conf.contribs['CAN']['swap-bytes'] = False import os, threading, six, subprocess, sys from subprocess import call from scapy.consts import LINUX iface0 = "vcan0" iface1 = "vcan1" ISOTP_KERNEL_MODULE_AVAILABLE = False def exit_if_no_isotp_module(): if not ISOTP_KERNEL_MODULE_AVAILABLE: sys.stderr.write("TEST SKIPPED: can-isotp not available" + os.linesep) warning("Can't test ISOTP native socket because kernel module is not loaded") exit(0) if 0 != call(["cansend", iface0, "000#"]): # vcan0 is not enabled if 0 != call(["sudo", "modprobe", "vcan"]): raise Exception("modprobe vcan failed") if 0 != call(["sudo", "ip", "link", "add", "name", iface0, "type", "vcan"]): print("add %s failed: Maybe it was already up?" % iface0) if 0 != call(["sudo", "ip", "link", "set", "dev", iface0, "up"]): raise Exception("could not bring up %s" % iface0) if 0 != call(["cansend", iface0, "000#"]): raise Exception("cansend doesn't work") if 0 != call(["cansend", iface1, "000#"]): # vcan1 is not enabled if 0 != call(["sudo", "modprobe", "vcan"]): raise Exception("modprobe vcan failed") if 0 != call(["sudo", "ip", "link", "add", "name", iface1, "type", "vcan"]): print("add %s failed: Maybe it was already up?" % iface1) if 0 != call(["sudo", "ip", "link", "set", "dev", iface1, "up"]): raise Exception("could not bring up %s" % iface1) if 0 != call(["cansend", iface1, "000#"]): raise Exception("cansend doesn't work") print("CAN should work now") if six.PY3 and LINUX: from scapy.contrib.cansocket_native import * new_can_socket = lambda iface: CANSocket(iface) new_can_socket0 = lambda: CANSocket(iface0) new_can_socket1 = lambda: CANSocket(iface1) s = new_can_socket(iface0) s.close() p1 = subprocess.Popen(['lsmod'], stdout = subprocess.PIPE) p2 = subprocess.Popen(['grep', '^can_isotp'], stdout = subprocess.PIPE, stdin=p1.stdout) p1.stdout.close() if p1.wait() == 0 and p2.wait() == 0 and b"can_isotp" in p2.stdout.read(): p = subprocess.Popen(["isotpsend", "-s1", "-d0", iface0], stdin = subprocess.PIPE) p.communicate(b"01") if p.returncode == 0: ISOTP_KERNEL_MODULE_AVAILABLE = True conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': ISOTP_KERNEL_MODULE_AVAILABLE} load_contrib("isotp") from scapy.contrib.isotp import ISOTPNativeSocket ISOTPSocket = ISOTPNativeSocket assert ISOTPSocket == ISOTPNativeSocket load_contrib('automotive.uds') load_contrib('automotive.ecu') example_responses = \ [ECUResponse(session=2, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), ECUResponse(session=range(3,5), security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), ECUResponse(session=[5,6,7], security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), ECUResponse(session=lambda x: 8 < x <= 10, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4"))] success = False with new_can_socket0() as isocan1, ISOTPSocket(isocan1, sid=0x700, did=0x600, basecls=UDS) as ecu, \ new_can_socket0() as isocan2, ISOTPSocket(isocan2, sid=0x600, did=0x700, basecls=UDS) as tester: answering_machine = ECU_am(supported_responses=example_responses, main_socket=ecu, basecls=UDS) sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) sim.start() # Simulator is running for 60s, see timeout parameter time.sleep(60) # Kill SIMULATOR tester.send(UDS(service=0xff)) sim.join(timeout=10)
The ECU simulator is parametrized through a list of ECUResponses. The order of this list is important. As soon as the answering machine finds a response that fits to a request, the response is sent. This means, a more detailed response has to be on top of a general response. This code shows two ISOTPSockets, one acts as ECU socket, the other as Tester socket. You can also create your own Tester socket and send commands via caringcaribou. The kwargs of the ECU_am object are important for the runtime behavior. You can specify
timeout
,count
orstop_filter
to determine, when the Simulator should be killed.
I tried running the above script as well, with can-isotp loaded. Here is the error I get :
CAN should work now
Traceback (most recent call last):
File "ecu_test.py", line 74, in <module>
[ECUResponse(session=2, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
NameError: name 'ECUResponse' is not defined
I think for some reason, my scapy does not load the automotive.ecu
package. Any idea?
Yes, might be, that this code is not yet part of the latest release. Could you try to pull from github and install manually? https://github.com/secdev/scapy.git Maybe you have to remove scapy from your site-packages manually, in order to ensure, the correct version is started.
Hi, did you had success?
@polybassa Not really. I did not proceed with the implementation. I found some alternatives here : https://github.com/zombieCraig/UDSim and https://github.com/zombieCraig/uds-server
I was wanting to ask you as well these things :
-
I have some diagnostics logs from a real car that is in workshop. There are a couple of requests and responses. Would it be possible to provide these files to the ECU module in scapy and then it can simulate the real ECU. ?
-
What other features are provided by the ECU module, apart from the UDS functionality?
Hi,
-
Yes, this was the original use case of the ECU module. Hope the documentation helps: https://scapy.readthedocs.io/en/latest/layers/automotive.html#ecu-utility-examples A list of supported responses can be used as input for a
ECU_am
-
The ECU module is basically protocol independent. Everything that works for UDS also works for GMLAN
Closing this issue, as standalone mock ECUs are more suitable to keep in separate projects.