|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
-
- import matlab.engine
- import socket
- import time
-
- DEFAULT_HOST = "127.0.0.1"
- DEFAULT_PORT = 30000
-
- DEFAULT_MODEL_NAME = 'VTD_2018a'
- DEFAULT_SUBMODEL_NAME = 'BaseParameters'
-
- class SimpleTcpClient:
-
- def __init__(self, host, port):
- self._skt = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
- self._host = host
- self._port = port
-
- def connect(self, sure=False):
- try:
- if not self._skt:
- self._skt = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
- self._skt.connect((self._host, self._port))
- return True
- except Exception as e:
- print("Failed: ", e)
- if sure:
- while not self._reconnect():
- pass
- return True
- else:
- self.close()
- return False
-
- def send(self, msg):
- try:
- self._skt.send(msg)
- return True
- except Exception as e:
- print("Failed: ", e)
- i = 0
- while i < 3:
- self._reconnect()
- try:
- self._skt.send(msg)
- return True
- except:
- pass
- i += 1
- return False
-
- def recv(self, buffer_size=1024):
- try:
- return self._skt.recv(buffer_size)
- except Exception as e:
- print("Failed: ", e)
- i = 0
- while i < 3:
- self._reconnect()
- try:
- return self._skt.recv(buffer_size)
- except:
- pass
- i += 1
- return None
-
- def _reconnect(self, times=3):
- t = 0
- while not self.connect() and t < times:
- self.close()
- time.sleep(1.0)
- t += 1
-
- # Timeout
- if t >= times:
- return False
- else:
- return True
-
- def close(self):
- if self._skt:
- try:
- self._skt.close()
- except Exception as e:
- print("Failed: ", e)
- pass
- finally:
- self._skt = None
- return False
-
- eng = None
- tcp_client = None
-
- def connect():
- global eng
- global tcp_client
-
- engs = matlab.engine.find_matlab()
- if len(engs) != 0:
- eng = matlab.engine.connect_matlab(engs[0])
- eng.load_system('VTD_2018a')
- print(engs)
- else:
- eng = None
- # Attempt to connect the server via TCP
- try:
- tcp_client = SimpleTcpClient(DEFAULT_HOST, DEFAULT_PORT)
- if tcp_client.connect(True):
- print("Server connected: ({}, {})".format(DEFAULT_HOST, DEFAULT_PORT))
- except Exception as e:
- print("Failed: ", e)
- tcp_client = None
-
- def disconnect():
- global eng
- global tcp_client
-
- if eng:
- eng.exit()
- elif tcp_client:
- tcp_client.close()
-
- def get_status() -> str:
- global eng
- global tcp_client
- if eng:
- if eng.get_param(DEFAULT_MODEL_NAME, 'SImulationStatus') == 'running':
- return "running"
- else:
- return "stopped"
- elif tcp_client:
- msg = "get_status"
- tcp_client.send(msg.encode('utf-8'))
- params = tcp_client.recv()
- if not params:
- return "stopped"
- else:
- return params.decode("utf-8")
-
- def get_p() -> tuple:
- global eng
- global tcp_client
- if eng:
- submodel_name = '{}/{}'.format(DEFAULT_MODEL_NAME, DEFAULT_SUBMODEL_NAME)
- x = eng.get_param(submodel_name, 'BaseXOffset')
- y = eng.get_param(submodel_name, 'BaseYOffset')
- h = eng.get_param(submodel_name, 'BaseHeadingOffset')
- return (x, y, h)
- elif tcp_client:
- msg = "get_param"
- tcp_client.send(msg.encode("utf-8"))
- params = tcp_client.recv()
- if not params:
- return (0, 0, 0)
- else:
- params = params.decode("utf-8")
- return tuple(map(lambda x: float(x), params.split(';')))
-
- def set_p(x: str, y: str, h: str, need_start=False):
- global eng
- global tcp_client
- if eng:
- submodel_name = '{}/{}'.format(DEFAULT_MODEL_NAME, DEFAULT_SUBMODEL_NAME)
- eng.set_param(submodel_name, 'BaseXOffset', x, nargout=0)
- eng.set_param(submodel_name, 'BaseYOffset', y, nargout=0)
- eng.set_param(submodel_name, 'BaseHeadingOffset', h, nargout=0)
- if need_start:
- start()
- elif tcp_client:
- msg = ''
- if start:
- msg = ";".join(["set_param", x, y, h, 'start'])
- else:
- msg = ";".join(["set_param", x, y, h])
- tcp_client.send(msg.encode("utf-8"))
-
- def start():
- global eng
- global tcp_client
- if eng:
- if eng.get_param(DEFAULT_MODEL_NAME, 'SImulationStatus') == 'stopped':
- eng.set_param(DEFAULT_MODEL_NAME, 'SimulationCommand', 'start', nargout=0)
- #eng.set_param('VTD_2018a/CarSim_Ego1/Scope', 'Open', 'on', nargout=0)
- elif tcp_client:
- msg = "start"
- tcp_client.send(msg.encode("utf-8"))
-
- def stop():
- global eng
- global tcp_client
- if eng:
- if eng.get_param(DEFAULT_MODEL_NAME, 'SImulationStatus') == 'running':
- eng.set_param(DEFAULT_MODEL_NAME, 'SimulationCommand', 'stop', nargout=0)
- #eng.set_param('VTD_2018a/CarSim_Ego1/Scope', 'Open', 'off', nargout=0)
- elif tcp_client:
- msg = "stop"
- tcp_client.send(msg.encode("utf-8"))
|