|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
-
- """A simple Python3 wrapper of the ``dbcc`` library.
-
- It contions a subset functions of the ``dbcc`` library, but also
- provide some auxiliary classes to simplify the development process.
-
-
- Typical usage example:
-
- # Create a delegate to receive the encding/decoding results.
- class MyDelegate(SignalDelegate):
- def __init__(self):
- super().__init__()
-
- def on_encoded(self, msg_id: int, db: bytearray):
- print("msg:", msg_id, ":", db)
-
- def on_decoded(self, msg_id: int, signal_name: str, value: float):
- print("msg:", msg_id, ",", signal_name, ":", value)
-
- # Then start to process the data from log file or real-time CAN bus.
- myhandler = MyDelegate()
-
- dbc_file_path = sys.argv[1]
- sp = SignalProcessor(dbc_file_path)
-
- msg_id = 578
- sp.register_delegate(msg_id, "ACCAccReqValHSC2", myhandler)
-
- # Prepare the CAN dataframe buffer, usually, it is received from CAN bus
- data = bytearray([0x01, 0xA8, 0, 0, 0, 0, 0, 0])
-
- # Do decode and encode, then you will receive callback in handler.
- sp.decode_message(msg_id, data)
- sp.encode_message(msg_id, {"ACCAccReqValHSC2": -5.1})
- """
-
- import logging
- from ._dbcc import DbcParser, GenHelper, version
-
-
- __author__ = ['"donkey" <anjingyu_ws@foxmail.com>']
- __all__ = ['SignalDelegate', 'SignalProcessor']
- __version__ = version()
-
-
- class SignalDelegate:
- """Signal/Message encode/decode delegate, you should inherit this class and implement you own delegate."""
- def on_encoded(self, msg_id: int, db: bytearray):
- pass
-
- def on_decoded(self, msg_id: int, signal_name: str, value: float):
- pass
-
-
- class SignalProcessor:
- PLAIN_FORMAT_STRING = '[{asctime}][{levelname}][{filename}:{lineno}][{funcName}] {message}'
-
- """Auxiliary class, help you process the dataframe easily."""
- def __init__(self, dbc_file_path: str):
- self.__dp = DbcParser(dbc_file_path)
- self.__msgs = {}
- self.__handlers = {}
-
- self.__logger = logging.getLogger("dbcc")
-
- console = logging.StreamHandler()
- formatter = logging.Formatter(SignalProcessor.PLAIN_FORMAT_STRING, style="{")
- console.setFormatter(formatter)
- console.setLevel(logging.DEBUG)
- self.__logger.addHandler(console)
-
- for msg in self.__dp:
- if msg.dlc == 0:
- continue
- msg_entity = (msg, {})
- for sig in msg:
- msg_entity[1][sig.name] = sig
- self.__msgs[msg.id] = msg_entity
-
- def register_delegate(self, msg_id: int, sig_name: str, delegate: SignalDelegate) -> bool:
- if msg_id in self.__msgs and sig_name in self.__msgs[msg_id][1]:
- if msg_id not in self.__handlers:
- self.__handlers[msg_id] = {}
- self.__handlers[msg_id][sig_name] = delegate
- return True
- return False
-
- def unregister_delegate(self, msg_id: int, sig_name: str):
- if msg_id in self.__handlers and sig_name in self.__handlers[msg_id]:
- del self.__handlers[msg_id][sig_name]
-
- if len(self.__handlers[msg_id]) == 0:
- del self.__handlers[msg_id]
-
- def decode_message(self, msg_id: int, data: bytearray):
- if msg_id in self.__handlers:
- sigs = self.__handlers[msg_id]
- for sig_name, hdl in sigs.items():
- sig = self.__msgs[msg_id][1][sig_name]
- r = sig.decode(data)
- hdl.on_decoded(msg_id, sig_name, r)
-
- def encode_message(self, msg_id: int, values: dict):
- if msg_id in self.__handlers:
- sigs = self.__handlers[msg_id]
- data = bytearray(8)
- for sig_name, val in values.items():
- if sig_name in sigs:
- sig = self.__msgs[msg_id][1][sig_name]
- sig.encode(val, data)
- else:
- self.__logger.warning("Did not register handler for signal (#{}).{}".format(msg_id, sig_name))
- sigs[sig_name].on_encoded(msg_id, data)
|