--[[ Thin wrapper of dbcc module for lua Author: donkey --]] local dbcc = {} dbcc.ByteOrder = { Motorola = 0, -- 0 big-endbian Intel = 1 -- 1 little-endbian } dbcc.Sign = { Unsigned = 0, Signed = 1 } dbcc.Multiplexor = { None = 0, Multiplexed = 1, Multiplexor = 2 } dbcc.dbcc = require("dbcc") local SignalDelegate = {} function SignalDelegate:new() o = {} setmetatable(o, self) self.__index = self return o end function SignalDelegate:on_encoded(msg_id, db): end function SignalDelegate:on_decoded(msg_id, signal_name, value): end dbcc.SignalDelegate = SignalDelegate local SignalProcessor = {} function SignalProcessor:new(dbc_file_path) o = {dp = dbcc.dbcc.DbcParser:new(dbc_file_path), msgs = {}, handlers = {}} setmetatable(o, self) self.__index = self -- Load messages from DBC file for msg in self.dp do if msg.dlc == 0 then continue end end return o end for msg in self.__dp: if msg.dlc == 0: continue msg_entity = (msg, {}) for sig in msg: msg_entity[1][sig.name] = sig self.__msgs[msg.id] = msg_entity def register_delegate(self, msg_id: int, sig_name: str, delegate: SignalDelegate) -> bool: if msg_id in self.__msgs and sig_name in self.__msgs[msg_id][1]: if msg_id not in self.__handlers: self.__handlers[msg_id] = {} self.__handlers[msg_id][sig_name] = delegate return True return False def unregister_delegate(self, msg_id: int, sig_name: str): if msg_id in self.__handlers and sig_name in self.__handlers[msg_id]: del self.__handlers[msg_id][sig_name] if len(self.__handlers[msg_id]) == 0: del self.__handlers[msg_id] def decode_message(self, msg_id: int, data: bytearray): if msg_id in self.__handlers: sigs = self.__handlers[msg_id] for sig_name, hdl in sigs.items(): sig = self.__msgs[msg_id][1][sig_name] r = sig.decode(data) hdl.on_decoded(msg_id, sig_name, r) def encode_message(self, msg_id: int, values: dict): if msg_id in self.__handlers: sigs = self.__handlers[msg_id] data = bytearray(8) for sig_name, val in values.items(): if sig_name in sigs: sig = self.__msgs[msg_id][1][sig_name] sig.encode(val, data) else: self.__logger.warning("Did not register handler for signal (#{}).{}".format(msg_id, sig_name)) sigs[sig_name].on_encoded(msg_id, data) return dbcc