|
- #!/usr/bin/env python3
- # -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
- # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
- #
- # DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- # Version 2, December 2004
- #
- # Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
- #
- # Everyone is permitted to copy and distribute verbatim or modified
- # copies of this license document, and changing it is allowed as long
- # as the name is changed.
- #
- # DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- # TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
- #
- # 0. You just DO WHAT THE FUCK YOU WANT TO.
- #
- # Copyright (c) 2022- donkey <anjingyu_ws@foxmail.com>
-
- import sys
- import os
- import struct
- import math
- import threading
- import queue
-
- from matplotlib.figure import Figure
- import ttkbootstrap as ttk
- from ttkbootstrap.dialogs.dialogs import Dialog
- from ttkbootstrap.constants import *
-
- from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
- import numpy as np
-
- import matplotlib
- import matplotlib.pyplot as plt
- import matplotlib.animation as animation
- from matplotlib.pyplot import autoscale
-
- matplotlib.use("TkAgg")
-
- __author__ = ['"donkey" <anjingyu_ws@foxmail.com>']
- __scopes__ = ["SingleScope", "MultiScope", "RealtimeScope"]
- __all__ = __scopes__ + ["ScopeDialog"]
-
- class SnapCursor:
- """
- The crosshair snaps to the nearest x, y point.
- X must be sorted
-
- fig, ax = plt.subplots()
- ax.plot(t, s, 'o')
- cursor = SnaptoCursor(ax)
- fig.canvas.mpl_connect('motion_notify_event', cursor.mouse_move)
- """
-
- def __init__(self, ax, x, ys=[], cs=[]):
- self.ax = ax
- self.lx = []
- self.y = []
- self.txt = []
- i = 0
- for y in ys:
- self.lx.append(self.ax.axhline(c=cs[i], lw=0.5,
- ls='--')) # the horiz lines
- self.y.append(y)
- # text location in axes coords
- self.txt.append(
- self.ax.text(0.72, (0.1 + i * 0.1),
- '',
- transform=ax.transAxes,
- c=cs[i]))
- i += 1
- self.ly = self.ax.axvline(c='c', lw=0.5, ls='--') # the vert line
- self.x = x
-
- def lower_bound(self, nums, target):
- low, high = 0, len(nums) - 1
- pos = len(nums) - 1
- while low < high:
- mid = int((low + high) / 2)
- if nums[mid] < target:
- low = mid + 1
- else: #>=
- high = mid
- if nums[low] >= target:
- pos = low
- return pos
-
- def upper_bound(self, nums, target):
- low, high = 0, len(nums) - 1
- pos = len(nums) - 1
- while low < high:
- mid = int((low + high) / 2)
- if nums[mid] <= target:
- low = mid + 1
- else: #>
- high = mid
- pos = high
- if nums[low] > target:
- pos = low
- return pos
-
- @property
- def line_objs(self):
- return [self.ly] + self.lx
-
- @property
- def text_objs(self):
- return self.txt
-
- def mouse_press(self, event):
- self.mouse_move(event)
-
- def mouse_move(self, event):
- if not event.inaxes:
- return
-
- if len(self.x) == 0:
- return
-
- indx = self.lower_bound(self.x, event.xdata)
- i = 0
- # update the line positions
- for lx in self.lx:
- lx.set_ydata(self.y[i][indx])
- self.txt[i].set_text('x=%1.2f, y=%1.2f' %
- (self.x[indx], self.y[i][indx]))
- i += 1
- self.ly.set_xdata(self.x[indx])
-
-
- # Options:
- # {
- # 'subplots': {'title': 'name', 'ylim': [-100, 100], 'xlim': [-100, 100], 'colors': ['red', 'blue'], 'labels': ['line1', 'line2'], 'xlabel': 'x', 'ylabel': 'y', 'autoscale': True, 'cursor': True},
- # 'interval': 30,
- # 'toolbar': True
- # }
- class SingleScope(ttk.Frame):
- def __init__(self, parent, config, **options):
- ttk.Frame.__init__(self, parent, **options)
- self._config = config
- self._parent = parent
- self._sub_figs = None
- self._line_objs = []
-
- if "subplots" not in self._config:
- raise Exception("subplots is required!")
-
- self._interval = 30
-
- self._msg_queue = queue.Queue()
-
- if "interval" in self._config:
- self._interval = self._config["interval"]
-
- self._setup_ui()
-
- def put_msg(self, timestamp, obj):
- """Caution: Always run in sub-thread"""
- self._msg_queue.put_nowait((timestamp, obj))
-
- def clear(self):
- self._sub_fig.clear()
-
- self._msg_queue.queue.clear()
-
- self._fig.canvas.draw()
- self._fig.canvas.flush_events()
-
- def _setup_ui(self):
- self._fig = plt.figure()
-
- ax = self._fig.add_subplot()
- self._sub_fig = RealtimeSubFigure(ax, self._config["subplots"])
- self._line_objs.extend(self._sub_fig.line_objs)
-
- self.show()
-
- def _update_data(self, _):
- while self._msg_queue.qsize() > 0:
- timestamp, dataset = self._msg_queue.get_nowait()
- self._sub_fig.update(timestamp, dataset)
-
- return self._line_objs
-
- def show(self):
- canvas = FigureCanvasTkAgg(self._fig, self)
-
- if "toolbar" in self._config and self._config["toolbar"]:
- toolbar = NavigationToolbar2Tk(canvas, self)
- toolbar.update()
-
- canvas.get_tk_widget().pack(side=ttk.BOTTOM, fill=ttk.BOTH, expand=True)
-
- if self._sub_fig.cursor:
- self._fig.canvas.mpl_connect(
- "button_press_event", self._sub_fig.cursor.mouse_press
- )
-
- self._animation = animation.FuncAnimation(
- fig=self._fig,
- func=self._update_data,
- interval=self._interval,
- cache_frame_data=False,
- blit=True,
- )
-
-
- class SubFigure:
- def __init__(self, ax, config: dict, sharex: bool = False):
- # Subplot
- self.__ax = ax
- self.__line_objs = []
- self.__config = config
-
- if "lines" not in self.__config:
- raise Exception("`lines` data is required!")
-
- self.__lines = self.__config["lines"]
-
- if not sharex:
- self.__ax.set_title(self.__config["title"])
- self.__ax.set_ylabel(self.__config["ylabel"])
- self.__ax.set_xlabel(self.__config["xlabel"])
-
- for line in self.__lines:
- options = {}
- if "color" in line:
- options["color"] = line["color"]
- else:
- options["color"] = "blue"
-
- if "label" in line:
- options["label"] = line["label"]
-
- self.__line_objs += self.__ax.plot(line["x"], line["y"], **options)
-
- def add_legend(self, names: list = []):
- # Place the legend in the top-left corner outside the figure
- self.__ax.legend(
- self.__line_objs,
- names,
- loc="upper right",
- )
-
-
- class MultiScope(ttk.Frame):
- def __init__(self, parent, config, **options):
- super().__init__(parent, **options)
- self.__config = config
- self.__sharex = False
-
- self.__sub_figs = []
-
- if "subplots" not in self.__config:
- raise Exception("`subplots` is required!")
-
- if "layout" not in self.__config:
- self.__config["layout"] = self.__layout()
- elif self.__config["layout"] == "sharex":
- self.__sharex = True
- else:
- if len(self.__config["layout"]["pos"]) != len(self.__config["subplots"]):
- raise Exception(
- "Length of layout.pos={}, length of subplots={}, are not equal!".format(
- len(self.__config["layout"]["pos"]),
- len(self.__config["subplots"]),
- )
- )
- self.__setup_ui()
-
- def __sharex_figures(self):
- self.__fig, axes = plt.subplots(len(self.__config["subplots"]), 1, sharex="all")
- # Remove the horizontal space of each sub-figure
- self.__fig.subplots_adjust(left=0.05, right=0.7, hspace=0)
-
- if isinstance(axes, np.ndarray):
- for ax_idx in range(len(axes)):
- my_sub_figure = SubFigure(
- axes[ax_idx], self.__config["subplots"][ax_idx], True
- )
- my_sub_figure.add_legend([self.__config["subplots"][ax_idx]["ylabel"]])
- self.__sub_figs.append(my_sub_figure)
-
- else:
- my_sub_figure = SubFigure(axes, self.__config["subplots"][0], True)
- my_sub_figure.add_legend([self.__config["subplots"][0]["ylabel"]])
- self.__sub_figs.append(my_sub_figure)
-
- self.show()
-
- def __layout_figures(self):
- self.__fig: Figure = plt.figure(
- figsize=(5, 4), constrained_layout=True, dpi=100
- )
-
- gs = self.__fig.add_gridspec(
- self.__config["layout"]["row"], self.__config["layout"]["column"]
- )
-
- for i in range(len(self.__config["subplots"])):
- ax = self.__fig.add_subplot(
- gs[
- self.__config["layout"]["pos"][i][0],
- self.__config["layout"]["pos"][i][1],
- ]
- )
- my_sub_figure = SubFigure(ax, self.__config["subplots"][i])
- ax.grid(True)
- self.__sub_figs.append(my_sub_figure)
-
- self.show()
-
- def __setup_ui(self):
- if self.__sharex:
- self.__sharex_figures()
- else:
- self.__layout_figures()
-
- def __layout(self):
- """Simple layout with hard code."""
- sub_plot_num = len(self.__config["subplots"])
-
- col = 0
- row = 0
-
- if sub_plot_num > 36:
- raise Exception("Too many subplots, the count should less than 36!")
-
- if sub_plot_num < 4:
- ret = {"row": sub_plot_num, "column": 1}
- pos = []
- for i in range(sub_plot_num):
- pos.append([slice(i, i + 1), slice(0, 1)])
- ret["pos"] = pos
- return ret
-
- for i in range(2, 7):
- if math.pow(i, 2) >= sub_plot_num:
- row = i
- break
- col = int(sub_plot_num / row) + (1 if sub_plot_num % row != 0 else 0)
- ret = {"row": row, "column": col}
- pos = []
- for i in range(row):
- for j in range(col):
- if (i * col + j) >= sub_plot_num:
- continue
- pos.append([slice(i, i + 1), slice(j, j + 1)])
- ret["pos"] = pos
-
- return ret
-
- def show(self):
- canvas = FigureCanvasTkAgg(self.__fig, self)
-
- if "toolbar" in self.__config and self.__config["toolbar"]:
- toolbar = NavigationToolbar2Tk(canvas, self)
- toolbar.update()
-
- canvas.get_tk_widget().pack(side=BOTTOM, fill=BOTH, expand=True)
-
-
- # options:
- # {
- # 'subplots': [{'title': 'name', 'ylim': [-100, 100], 'xlim': [-100, 100], 'colors': ['red', 'blue'], 'labels': ['line1', 'line2'], 'xlabel': 'x', 'ylabel': 'y', 'autoscale': True, 'cursor': True}],
- # 'layout': {'row': 3, 'column': 3, 'pos': [[]]},
- # 'interval': 30,
- # 'toolbar': True
- # }
- class RealtimeSubFigure:
- def __init__(self, ax, config: dict):
- # Subplot
- self._ax = ax
- self._config = config
- self._lines = []
- colors = []
-
- if "colors" in self._config:
- if "labels" not in self._config:
- raise Exception("colors and labels must be existent at the same time!")
-
- for i in self._config["colors"]:
- self._lines.append([[], []])
- colors.append(i)
- self._line_objs = []
-
- self._autoscale = (
- True if "autoscale" in self._config and self._config["autoscale"] else False
- )
-
- self._xlim = [999999999, -999999990] # min, max
- self._ylim = [999999999, -999999990] # min, max
-
- self._ax.set_title(self._config["title"])
- self._ax.set_xlabel(self._config["xlabel"])
- self._ax.set_ylabel(self._config["ylabel"])
-
- if "xlim" in self._config:
- self._ax.set_xlim(self._config["xlim"][0], self._config["xlim"][1])
-
- if "ylim" in self._config:
- self._ax.set_ylim(self._config["ylim"][0], self._config["ylim"][1])
-
- self._ax.grid(True)
-
- if len(self._lines) == 0:
- # Only one line
- self._lines.append([[], []])
- self._line_objs.append(
- self._ax.plot(self._lines[0][0], self._lines[0][1])[0]
- )
- else:
- for i in range(len(self._lines)):
- self._line_objs.append(
- self._ax.plot(
- self._lines[i][0],
- self._lines[i][1],
- color=colors[i],
- label=self._config["labels"][i],
- )[0]
- )
-
- self._ax.legend(
- self._line_objs,
- [l.get_label() for l in self._line_objs],
- loc="upper right",
- )
-
- self._cursor = None
-
- if "cursor" in self._config and self._config["cursor"]:
- ys = [l[1] for l in self._lines]
- self._cursor = SnapCursor(self._ax, self._lines[0][0], ys, colors)
-
- self._line_objs.extend(self._cursor.line_objs)
- self._line_objs.extend(self._cursor.text_objs)
-
- @property
- def cursor(self):
- return self._cursor
-
- def update(self, timestamp, lines):
- for i in range(len(lines)):
- x = timestamp
- y = lines[i]
-
- self._lines[i][0].append(x)
- self._lines[i][1].append(y)
-
- if self._autoscale:
- # Update xlim and ylim
- if self._xlim[0] > x:
- self._xlim[0] = x
-
- if self._xlim[1] < x:
- self._xlim[1] = x
-
- if self._ylim[0] > y:
- self._ylim[0] = y
-
- if self._ylim[1] < y:
- self._ylim[1] = y
-
- self._line_objs[i].set_data(self._lines[i][0], self._lines[i][1])
-
- self._do_autoscale()
-
- return self._line_objs
-
- def _do_autoscale(self):
- if not self._autoscale:
- return
-
- lo, hi = self._ax.get_xlim()
- bt, tp = self._ax.get_ylim()
-
- need_update_x = False
- need_update_y = False
-
- if lo > self._xlim[0]:
- lo = self._xlim[0]
- need_update_x = True
-
- if hi < self._xlim[1]:
- hi = self._xlim[1]
- need_update_x = True
-
- if bt > self._ylim[0]:
- bt = self._ylim[0]
- need_update_y = True
-
- if tp < self._ylim[1]:
- tp = self._ylim[1]
- need_update_y = True
-
- if need_update_x:
- self._ax.set_xlim(*self._xlim)
-
- if need_update_y:
- self._ax.set_ylim(*self._ylim)
-
- if need_update_x or need_update_y:
- self._ax.figure.canvas.draw()
-
- @property
- def line_objs(self) -> list:
- return self._line_objs
-
- def clear(self):
- for line in self._lines:
- line[0].clear()
- line[1].clear()
-
- def dsize(self):
- dl = []
- for line in self._lines:
- dl.append([len(line[0]), len(line[1])])
-
- return dl
-
-
- class RealtimeScope(ttk.Frame):
- def __init__(self, parent, config, **options):
- ttk.Frame.__init__(self, parent, **options)
- self._config = config
- self._parent = parent
- self._sub_figs = []
- self._line_objs = []
-
- if "subplots" not in self._config:
- raise Exception("subplots is required!")
-
- if "layout" in self._config:
- if len(self._config["layout"]["pos"]) != len(self._config["subplots"]):
- raise Exception(
- "Length of layout.pos={}, length of subplots={}, are not equal!".format(
- len(self._config["layout"]["pos"]),
- len(self._config["subplots"]),
- )
- )
-
- self._interval = 30
-
- self._msg_queue = queue.Queue()
-
- if "interval" in self._config:
- self._interval = self._config["interval"]
-
- self._setup_ui()
-
- def put_msg(self, timestamp, obj):
- """Caution: Always run in sub-thread"""
- self._msg_queue.put_nowait((timestamp, obj))
-
- def clear(self):
- for sub_fig in self._sub_figs:
- sub_fig.clear()
-
- self._msg_queue.queue.clear()
-
- self._fig.canvas.draw()
- self._fig.canvas.flush_events()
-
- def _setup_ui(self):
- self._fig = plt.figure(figsize=(5, 4), constrained_layout=True, dpi=100)
-
- gs = self._fig.add_gridspec(
- self._config["layout"]["row"], self._config["layout"]["column"]
- )
-
- for i in range(len(self._config["subplots"])):
- ax = self._fig.add_subplot(
- gs[
- self._config["layout"]["pos"][i][0],
- self._config["layout"]["pos"][i][1],
- ]
- )
- my_sub_figure = RealtimeSubFigure(ax, self._config["subplots"][i])
- self._sub_figs.append(my_sub_figure)
- self._line_objs.extend(my_sub_figure.line_objs)
-
- self.show()
-
- def _update_data(self, _):
- while self._msg_queue.qsize() > 0:
- timestamp, dataset = self._msg_queue.get_nowait()
- for i in range(len(dataset)):
- self._sub_figs[i].update(timestamp, dataset[i])
-
- # TODO(anjingyu): Update the display range of X axis
- # ax.set_xlim(newrange_min, newrange_max)
-
- return self._line_objs
-
- def show(self):
- canvas = FigureCanvasTkAgg(self._fig, self)
-
- if "toolbar" in self._config and self._config["toolbar"]:
- toolbar = NavigationToolbar2Tk(canvas, self)
- toolbar.update()
-
- canvas.get_tk_widget().pack(side=ttk.BOTTOM, fill=ttk.BOTH, expand=True)
-
- for sub_fig in self._sub_figs:
- if sub_fig.cursor:
- self._fig.canvas.mpl_connect(
- "button_press_event", sub_fig.cursor.mouse_press
- )
-
- self._animation = animation.FuncAnimation(
- fig=self._fig,
- func=self._update_data,
- interval=self._interval,
- cache_frame_data=False,
- blit=True,
- )
-
-
- class ScopeDialog(ttk.Window):
- def __init__(self, scope="SingleScope", title="", **options):
- if not scope.endswith("Scope"):
- scope = f"{scope}Scope"
-
- if scope not in __scopes__:
- raise RuntimeError(f"{scope} is not a valid scope name: {__scopes__}")
-
- super().__init__(title=title)
-
- w, h = 800, 660
-
- self.__resize_center(w, h)
- self.minsize(w, h)
- self.protocol("WM_DELETE_WINDOW", self.__quit)
-
- self.__scope = globals()[scope](self, config=options)
- self.__scope.pack(expand=True, fill=BOTH)
-
- def __resize_center(self, w, h):
- # get screen width and height
- ws = self.winfo_screenwidth()
- hs = self.winfo_screenheight()
-
- # calculate position x, y
- x = int((ws / 2) - (w / 2))
- y = int((hs / 2) - (h / 2))
- self.geometry(f'{w}x{h}+{x}+{y}')
-
- def __quit(self):
- self.destroy()
- self.quit()
-
-
- def parse_file(filepath: str) -> list:
- return list(open(filepath, 'rb').read())
-
- if __name__ == '__main__':
- args = sys.argv[1:]
-
- if len(args) == 0:
- print("Usage: scope 1.bin 2.bin 3.bin ...")
- sys.exit(0)
-
- dataset_list = []
- names = []
-
- max_x = 0
-
- for f in args:
- fname = os.path.abspath(f)
- names.append(os.path.splitext(os.path.basename(f))[0])
- data = parse_file(fname)
- dataset_list.append(data)
- dl = len(data)
- if dl > max_x:
- max_x = dl
-
- # Append padding data
- i = 0
- for data in dataset_list:
- dl = len(data)
- if dl < max_x:
- dataset_list[i] = data + [0] * (max_x - dl)
- i += 1
-
- for d in dataset_list:
- i = 0
- for x in d:
- if x == 0:
- print(i)
- i += 1
-
- sys.exit(0)
-
- options = {}
- subplots = []
- xdata = []
- cidx = 0
- colors = [
- 'green',
- 'blue',
- 'red',
- 'cyan',
- 'magenta',
- 'yellow',
- ]
-
- for selected in dataset_list:
- if not xdata:
- xdata = list(range(len(selected)))
-
- ydata = selected
- plot = {
- 'title': names[cidx],
- 'xlabel': 'Frame Num',
- 'ylabel': names[cidx],
- 'lines': [{
- 'x': xdata,
- 'y': ydata,
- 'color': colors[cidx],
- 'label': names[cidx]
- }]
- }
-
- subplots.append(plot)
- cidx += 1
- cidx = cidx % len(colors)
-
- options['subplots'] = subplots
- options['layout'] = 'sharex'
- options['toolbar'] = True
-
- sd = ScopeDialog("MultiScope", "Signals", **options)
-
- sd.mainloop()
|