树莓派部分:
python代码
#!/usr/bin/env python #!/usr/bin/env python # pip install websocket-server from http.server import SimpleHTTPRequestHandler, HTTPServer from urllib.parse import urlparse, parse_qs import cv2 import time import base64 import json import os import serial import struct import numpy as np import RPi.GPIO as GPIO import threading os.system('sudo chmod 666 /dev/video0') cap = cv2.VideoCapture(0) # 打开摄像头 GPIO.setmode(GPIO.BCM) # 定义GPIO引脚 limit_switch_pins = { 17: '1', 18: '2', 27: '3', 22: '4', 23: '5' } # 设置默认状态值 limit_switch_status = { '1': False, '2': False, '3': False, '4': False, '5': False } #五个电机对应的can_id canid = { '1': 121, '2': 122, '3': 123, '4': 124, '5': 125 } # 设置GPIO引脚为输入,并启用内部上拉电阻 for pin in limit_switch_pins.keys(): GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) def monitor_limit_switches(): try: while True: all_triggered = True for pin, name in limit_switch_pins.items(): can_id=canid[name] if GPIO.input(pin) == GPIO.LOW: limit_switch_status[name] = True #JOG_san(ser, can_id,0)#没有限位器,先不运行 #ZERO_MECH(ser, can_id)#没有限位器,先不运行 else: limit_switch_status[name] = False #JOG_san(ser, can_id,1) #没有限位器,先不运行 all_triggered = False if all_triggered: print("已经在准备模式") break time.sleep(2) except KeyboardInterrupt: print("监听线程中断") def create_no_signal_image(): # 创建一张黑色图像 img = np.zeros((480, 640, 3), dtype=np.uint8) # 在图像中心添加“NO SIGNAL”文字 font = cv2.FONT_HERSHEY_SIMPLEX text = "NO SIGNAL" font_scale = 2 thickness = 3 color = (0, 0, 255) # 红色文字 # 计算文字大小和位置 text_size = cv2.getTextSize(text, font, font_scale, thickness)[0] text_x = (img.shape[1] - text_size[0]) // 2 text_y = (img.shape[0] + text_size[1]) // 2 # 在图像上绘制文字 cv2.putText(img, text, (text_x, text_y), font, font_scale, color, thickness) return img # 创建“NO SIGNAL”图像 def bytes_to_text(cmd_x): cmd_dict = { '00': ' 获取设备 ID', '01': '运控模式电机控制指令', '02': '电机反馈数据', # ... 其他命令映射 ... '90': '设置参数' } return cmd_dict.get(cmd_x, cmd_x) # 返回对应的命令或默认为字节 def value_to_bytes(value): high_byte = 0xDC + (value - 123) * 8 return '{:02x} 08'.format(high_byte) def float_to_bytes(f): byte_val = struct.pack('<f', f) return ' '.join('{:02x}'.format(b) for b in byte_val) def int_to_reversed_hex(value): hex_str = "{:04x}".format(value) return ' '.join([hex_str[i:i+2].upper() for i in range(2, -1, -2)]) def bytes_to_value(value): hex_str = "{:04x}".format(value) if not isinstance(value, str) else value high_byte = int(hex_str[:2], 16) return 123 + (high_byte - 0xDC) // 8 def extract_info(parsed_command): return parsed_command[4:6], parsed_command[10:14], parsed_command[14:18], parsed_command[22:30] def bytes_to_float(byte_str): bytes_val = bytes.fromhex(byte_str.replace(' ', '')) float_val = struct.unpack('<f', bytes_val)[0] return round(float_val, 4) def reverse_hex_bytes(hex_str): return "0x" + hex_str[2:] + hex_str[:2] def initialize_serial_port(): result = os.popen("sudo ls -l /dev/ttyUSB*").read() com = result.split()[-1] os.system(f"sudo chmod 777 {com}") os.system(f"sudo fuser -k {com}") ser = serial.Serial(com, 921600, timeout=1) ser.write(bytes.fromhex('41 54 2b 41 54 0d 0a')) return ser def send_command(ser, can_id, cmd_mode, index, value): can_id = int(can_id) value = float(value) id_num, index_num = value_to_bytes(can_id), int_to_reversed_hex(index) value_num = float_to_bytes(value) date = f"41 54 {cmd_mode} 07 eb {id_num} {index_num} 00 00 {value_num} 0d 0a" #print(f"发送L91指令:{cmd_mode}至CAN{can_id}:索引{index},值{value}") try: ser.write(bytes.fromhex(date)) response = ser.readline() hex_string = ' '.join([response[i:i+1].hex() for i in range(len(response))]) cmd_x, can_id_x, index_x, value_x = extract_info( hex_string.replace(" ", "")) cmd_int, can_id_int = bytes_to_text(cmd_x), bytes_to_value(can_id_x) index_int, value_int = reverse_hex_bytes( index_x), bytes_to_float(value_x) #print(f"接收内容《类型: {cmd_int} canid: {can_id_int} index: {index_int} value: {value_int}》RX: {hex_string}") except Exception as e: print(f"出现错误: {e}") return hex_string def send_int(ser, date): ser.write(bytes.fromhex(date)) response = ser.readline() hex_string = ' '.join([response[i:i+1].hex() for i in range(len(response))]) cmd_x, can_id_x, index_x, value_x = extract_info( hex_string.replace(" ", "")) cmd_int, can_id_int = bytes_to_text(cmd_x), bytes_to_value(can_id_x) index_int, value_int = reverse_hex_bytes(index_x), bytes_to_float(value_x) print(f"接收内容《类型: {cmd_int} canid: {can_id_int} index: {index_int} value: {value_int}》RX: {hex_string}") return hex_string def JOG_san(ser, can_id,zd_type):#速度0.5模式正负转与停 can_id = int(can_id) id_num = value_to_bytes(can_id) if zd_type==1: date=f"41 54 90 07 eb {id_num} 05 70 00 00 07 01 82 21 0d 0a" elif zd_type==0: date=f"41 54 90 07 eb {id_num} 05 70 00 00 07 00 7f ff 0d 0a" elif zd_type==-1: date=f"41 54 90 07 eb {id_num} 05 70 00 00 07 01 7d dd 0d 0a" ser.write(bytes.fromhex(date)) def ZERO_MECH(ser, can_id):#设置零点 can_id = int(can_id) id_num = value_to_bytes(can_id) date=f"41 54 30 07 eb {id_num} 01 01 00 00 00 00 00 00 0d 0a" ser.write(bytes.fromhex(date)) def get_img(x=640, y=360): if not cap.isOpened(): print("Error: Could not open camera.") frame = no_signal_image else: ret, frame = cap.read() if not ret: print("Error: Could not read frame. Returning 'NO SIGNAL' image.") frame = no_signal_image # 保持原始颜色空间,不进行转换 scaled_frame = cv2.resize(frame, (x, y), interpolation=cv2.INTER_AREA) image_bytes = cv2.imencode('.jpg', scaled_frame)[1].tobytes() encoded_image = base64.b64encode(image_bytes).decode('utf-8') return encoded_image class MyHandler(SimpleHTTPRequestHandler): def do_GET(self): global canid parsed_path = urlparse(self.path) query_params = parse_qs(parsed_path.query) if self.path == '/': self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(b"Hello, World!") elif self.path.startswith('/hello/'): name = self.path.split('/')[-1] self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() message = f"Hello, {name}!".encode('utf-8') self.wfile.write(message) elif parsed_path.path == '/greet': id = query_params.get('id', [''])[0] rad = query_params.get('rad', [''])[0] value = query_params.get('value', [''])[0] can_id=canid[id] #print('速度限制') send_command(ser, can_id, '90', 28695, rad) #print('角度限制') send_command(ser, can_id, '90', 28694, value) message = f'can_id{can_id},rad{rad},value{value}' html_content = f''' Image Page{message}''' self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(html_content.encode('utf-8')) elif parsed_path.path == '/wzms': id = query_params.get('id', [''])[0] can_id=canid[id] id_num=value_to_bytes(can_id) print('位置模式') send_int(ser, f'41 54 90 07 eb {id_num} 05 70 00 00 01 00 00 00 0d 0a') print('设置模式1') send_command(ser, can_id, '18', 28677, 1) html_content = '位置模式' self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(html_content.encode('utf-8')) elif parsed_path.path == '/res': id = query_params.get('id', [''])[0] can_id=canid[id] id_num=value_to_bytes(can_id) print('重置模式') html_content = '重置模式' send_int(ser, f'41 54 20 07 eb {id_num} 00 00 00 00 00 00 00 00 0d 0a') self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(html_content.encode('utf-8')) elif parsed_path.path == '/img': # HTML 内容包含 标签 x = int(query_params.get('x', [''])[0]) y = int(query_params.get('y', [''])[0]) ca_img=get_img(x,y) html_content = ca_img self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(html_content.encode('utf-8')) else: self.send_response(404) self.end_headers() self.wfile.write(b"404 Not Found") def run(server_class=HTTPServer, handler_class=MyHandler, port=8000): server_address = ('', port) httpd = server_class(server_address, handler_class) print(f"Starting httpd server on port {port}") httpd.serve_forever() if __name__ == '__main__': no_signal_image = create_no_signal_image() ser = initialize_serial_port() monitoring_thread = threading.Thread(target=monitor_limit_switches) monitoring_thread.start() run()