QQ扫码 加入我们
如果您对这篇文章满意,欢迎支付宝赞助

树莓派部分:
python代码
#!/usr/bin/env python
#!/usr/bin/env python
# pip install websocket-server
from http.server import SimpleHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse, parse_qs
import cv2
import time
import base64
import json
import os
import serial
import struct
import numpy as np
import RPi.GPIO as GPIO
import threading
os.system('sudo chmod 666 /dev/video0')
cap = cv2.VideoCapture(0) # 打开摄像头
GPIO.setmode(GPIO.BCM)
# 定义GPIO引脚
limit_switch_pins = {
17: '1',
18: '2',
27: '3',
22: '4',
23: '5'
}
# 设置默认状态值
limit_switch_status = {
'1': False,
'2': False,
'3': False,
'4': False,
'5': False
}
#五个电机对应的can_id
canid = {
'1': 121,
'2': 122,
'3': 123,
'4': 124,
'5': 125
}
# 设置GPIO引脚为输入,并启用内部上拉电阻
for pin in limit_switch_pins.keys():
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def monitor_limit_switches():
try:
while True:
all_triggered = True
for pin, name in limit_switch_pins.items():
can_id=canid[name]
if GPIO.input(pin) == GPIO.LOW:
limit_switch_status[name] = True
#JOG_san(ser, can_id,0)#没有限位器,先不运行
#ZERO_MECH(ser, can_id)#没有限位器,先不运行
else:
limit_switch_status[name] = False
#JOG_san(ser, can_id,1) #没有限位器,先不运行
all_triggered = False
if all_triggered:
print("已经在准备模式")
break
time.sleep(2)
except KeyboardInterrupt:
print("监听线程中断")
def create_no_signal_image():
# 创建一张黑色图像
img = np.zeros((480, 640, 3), dtype=np.uint8)
# 在图像中心添加“NO SIGNAL”文字
font = cv2.FONT_HERSHEY_SIMPLEX
text = "NO SIGNAL"
font_scale = 2
thickness = 3
color = (0, 0, 255) # 红色文字
# 计算文字大小和位置
text_size = cv2.getTextSize(text, font, font_scale, thickness)[0]
text_x = (img.shape[1] - text_size[0]) // 2
text_y = (img.shape[0] + text_size[1]) // 2
# 在图像上绘制文字
cv2.putText(img, text, (text_x, text_y), font, font_scale, color, thickness)
return img
# 创建“NO SIGNAL”图像
def bytes_to_text(cmd_x):
cmd_dict = {
'00': ' 获取设备 ID', '01': '运控模式电机控制指令', '02': '电机反馈数据',
# ... 其他命令映射 ...
'90': '设置参数'
}
return cmd_dict.get(cmd_x, cmd_x) # 返回对应的命令或默认为字节
def value_to_bytes(value):
high_byte = 0xDC + (value - 123) * 8
return '{:02x} 08'.format(high_byte)
def float_to_bytes(f):
byte_val = struct.pack('<f', f)
return ' '.join('{:02x}'.format(b) for b in byte_val)
def int_to_reversed_hex(value):
hex_str = "{:04x}".format(value)
return ' '.join([hex_str[i:i+2].upper() for i in range(2, -1, -2)])
def bytes_to_value(value):
hex_str = "{:04x}".format(value) if not isinstance(value, str) else value
high_byte = int(hex_str[:2], 16)
return 123 + (high_byte - 0xDC) // 8
def extract_info(parsed_command):
return parsed_command[4:6], parsed_command[10:14], parsed_command[14:18], parsed_command[22:30]
def bytes_to_float(byte_str):
bytes_val = bytes.fromhex(byte_str.replace(' ', ''))
float_val = struct.unpack('<f', bytes_val)[0]
return round(float_val, 4)
def reverse_hex_bytes(hex_str):
return "0x" + hex_str[2:] + hex_str[:2]
def initialize_serial_port():
result = os.popen("sudo ls -l /dev/ttyUSB*").read()
com = result.split()[-1]
os.system(f"sudo chmod 777 {com}")
os.system(f"sudo fuser -k {com}")
ser = serial.Serial(com, 921600, timeout=1)
ser.write(bytes.fromhex('41 54 2b 41 54 0d 0a'))
return ser
def send_command(ser, can_id, cmd_mode, index, value):
can_id = int(can_id)
value = float(value)
id_num, index_num = value_to_bytes(can_id), int_to_reversed_hex(index)
value_num = float_to_bytes(value)
date = f"41 54 {cmd_mode} 07 eb {id_num} {index_num} 00 00 {value_num} 0d 0a"
#print(f"发送L91指令:{cmd_mode}至CAN{can_id}:索引{index},值{value}")
try:
ser.write(bytes.fromhex(date))
response = ser.readline()
hex_string = ' '.join([response[i:i+1].hex()
for i in range(len(response))])
cmd_x, can_id_x, index_x, value_x = extract_info(
hex_string.replace(" ", ""))
cmd_int, can_id_int = bytes_to_text(cmd_x), bytes_to_value(can_id_x)
index_int, value_int = reverse_hex_bytes(
index_x), bytes_to_float(value_x)
#print(f"接收内容《类型: {cmd_int} canid: {can_id_int} index: {index_int} value: {value_int}》RX: {hex_string}")
except Exception as e:
print(f"出现错误: {e}")
return hex_string
def send_int(ser, date):
ser.write(bytes.fromhex(date))
response = ser.readline()
hex_string = ' '.join([response[i:i+1].hex()
for i in range(len(response))])
cmd_x, can_id_x, index_x, value_x = extract_info(
hex_string.replace(" ", ""))
cmd_int, can_id_int = bytes_to_text(cmd_x), bytes_to_value(can_id_x)
index_int, value_int = reverse_hex_bytes(index_x), bytes_to_float(value_x)
print(f"接收内容《类型: {cmd_int} canid: {can_id_int} index: {index_int} value: {value_int}》RX: {hex_string}")
return hex_string
def JOG_san(ser, can_id,zd_type):#速度0.5模式正负转与停
can_id = int(can_id)
id_num = value_to_bytes(can_id)
if zd_type==1:
date=f"41 54 90 07 eb {id_num} 05 70 00 00 07 01 82 21 0d 0a"
elif zd_type==0:
date=f"41 54 90 07 eb {id_num} 05 70 00 00 07 00 7f ff 0d 0a"
elif zd_type==-1:
date=f"41 54 90 07 eb {id_num} 05 70 00 00 07 01 7d dd 0d 0a"
ser.write(bytes.fromhex(date))
def ZERO_MECH(ser, can_id):#设置零点
can_id = int(can_id)
id_num = value_to_bytes(can_id)
date=f"41 54 30 07 eb {id_num} 01 01 00 00 00 00 00 00 0d 0a"
ser.write(bytes.fromhex(date))
def get_img(x=640, y=360):
if not cap.isOpened():
print("Error: Could not open camera.")
frame = no_signal_image
else:
ret, frame = cap.read()
if not ret:
print("Error: Could not read frame. Returning 'NO SIGNAL' image.")
frame = no_signal_image
# 保持原始颜色空间,不进行转换
scaled_frame = cv2.resize(frame, (x, y), interpolation=cv2.INTER_AREA)
image_bytes = cv2.imencode('.jpg', scaled_frame)[1].tobytes()
encoded_image = base64.b64encode(image_bytes).decode('utf-8')
return encoded_image
class MyHandler(SimpleHTTPRequestHandler):
def do_GET(self):
global canid
parsed_path = urlparse(self.path)
query_params = parse_qs(parsed_path.query)
if self.path == '/':
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(b"Hello, World!")
elif self.path.startswith('/hello/'):
name = self.path.split('/')[-1]
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
message = f"Hello, {name}!".encode('utf-8')
self.wfile.write(message)
elif parsed_path.path == '/greet':
id = query_params.get('id', [''])[0]
rad = query_params.get('rad', [''])[0]
value = query_params.get('value', [''])[0]
can_id=canid[id]
#print('速度限制')
send_command(ser, can_id, '90', 28695, rad)
#print('角度限制')
send_command(ser, can_id, '90', 28694, value)
message = f'can_id{can_id},rad{rad},value{value}'
html_content = f'''
Image Page{message}'''
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(html_content.encode('utf-8'))
elif parsed_path.path == '/wzms':
id = query_params.get('id', [''])[0]
can_id=canid[id]
id_num=value_to_bytes(can_id)
print('位置模式')
send_int(ser, f'41 54 90 07 eb {id_num} 05 70 00 00 01 00 00 00 0d 0a')
print('设置模式1')
send_command(ser, can_id, '18', 28677, 1)
html_content = '位置模式'
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(html_content.encode('utf-8'))
elif parsed_path.path == '/res':
id = query_params.get('id', [''])[0]
can_id=canid[id]
id_num=value_to_bytes(can_id)
print('重置模式')
html_content = '重置模式'
send_int(ser, f'41 54 20 07 eb {id_num} 00 00 00 00 00 00 00 00 0d 0a')
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(html_content.encode('utf-8'))
elif parsed_path.path == '/img':
# HTML 内容包含 标签
x = int(query_params.get('x', [''])[0])
y = int(query_params.get('y', [''])[0])
ca_img=get_img(x,y)
html_content = ca_img
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(html_content.encode('utf-8'))
else:
self.send_response(404)
self.end_headers()
self.wfile.write(b"404 Not Found")
def run(server_class=HTTPServer, handler_class=MyHandler, port=8000):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print(f"Starting httpd server on port {port}")
httpd.serve_forever()
if __name__ == '__main__':
no_signal_image = create_no_signal_image()
ser = initialize_serial_port()
monitoring_thread = threading.Thread(target=monitor_limit_switches)
monitoring_thread.start()
run()
扫描二维码联系管理员