QQ群:336150653

捐赠赞助

最新文档

树莓派机械臂控制代码
更新于 2024-08-05 22:09:45

树莓派部分:

python代码

#!/usr/bin/env python
#!/usr/bin/env python
# pip install websocket-server
from http.server import SimpleHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse, parse_qs
import cv2
import time
import base64
import json
import os
import serial
import struct
import numpy as np
import RPi.GPIO as GPIO
import threading
os.system('sudo chmod 666 /dev/video0')
cap = cv2.VideoCapture(0)  # 打开摄像头
GPIO.setmode(GPIO.BCM)
# 定义GPIO引脚
limit_switch_pins = {
    17: '1',
    18: '2',
    27: '3',
    22: '4',
    23: '5'
}
# 设置默认状态值
limit_switch_status = {
    '1': False,
    '2': False,
    '3': False,
    '4': False,
    '5': False
}
#五个电机对应的can_id
canid = {
    '1': 121,
    '2': 122,
    '3': 123,
    '4': 124,
    '5': 125
}
# 设置GPIO引脚为输入,并启用内部上拉电阻
for pin in limit_switch_pins.keys():
    GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def monitor_limit_switches():
    try:
        while True:
            all_triggered = True
            for pin, name in limit_switch_pins.items():
                can_id=canid[name]
                if GPIO.input(pin) == GPIO.LOW:
                    limit_switch_status[name] = True
                    #JOG_san(ser, can_id,0)#没有限位器,先不运行
                    #ZERO_MECH(ser, can_id)#没有限位器,先不运行
                else:
                    limit_switch_status[name] = False
                    #JOG_san(ser, can_id,1) #没有限位器,先不运行
                    all_triggered = False
            if all_triggered:
                print("已经在准备模式")
                break
            time.sleep(2)
    except KeyboardInterrupt:
        print("监听线程中断")
def create_no_signal_image():
    # 创建一张黑色图像
    img = np.zeros((480, 640, 3), dtype=np.uint8)

    # 在图像中心添加“NO SIGNAL”文字
    font = cv2.FONT_HERSHEY_SIMPLEX
    text = "NO SIGNAL"
    font_scale = 2
    thickness = 3
    color = (0, 0, 255)  # 红色文字

    # 计算文字大小和位置
    text_size = cv2.getTextSize(text, font, font_scale, thickness)[0]
    text_x = (img.shape[1] - text_size[0]) // 2
    text_y = (img.shape[0] + text_size[1]) // 2

    # 在图像上绘制文字
    cv2.putText(img, text, (text_x, text_y), font, font_scale, color, thickness)

    return img

# 创建“NO SIGNAL”图像
def bytes_to_text(cmd_x):
    cmd_dict = {
        '00': ' 获取设备 ID', '01': '运控模式电机控制指令', '02': '电机反馈数据',
        # ... 其他命令映射 ...
        '90': '设置参数'
    }
    return cmd_dict.get(cmd_x, cmd_x)  # 返回对应的命令或默认为字节


def value_to_bytes(value):
    high_byte = 0xDC + (value - 123) * 8
    return '{:02x} 08'.format(high_byte)
def float_to_bytes(f):
    byte_val = struct.pack('<f', f)
    return ' '.join('{:02x}'.format(b) for b in byte_val)


def int_to_reversed_hex(value):
    hex_str = "{:04x}".format(value)
    return ' '.join([hex_str[i:i+2].upper() for i in range(2, -1, -2)])
def bytes_to_value(value):
    hex_str = "{:04x}".format(value) if not isinstance(value, str) else value
    high_byte = int(hex_str[:2], 16)
    return 123 + (high_byte - 0xDC) // 8
def extract_info(parsed_command):
    return parsed_command[4:6], parsed_command[10:14], parsed_command[14:18], parsed_command[22:30]
def bytes_to_float(byte_str):
    bytes_val = bytes.fromhex(byte_str.replace(' ', ''))
    float_val = struct.unpack('<f', bytes_val)[0]
    return round(float_val, 4)
def reverse_hex_bytes(hex_str):
    return "0x" + hex_str[2:] + hex_str[:2]
def initialize_serial_port():
    result = os.popen("sudo ls -l /dev/ttyUSB*").read()
    com = result.split()[-1]
    os.system(f"sudo chmod 777 {com}")
    os.system(f"sudo fuser -k {com}")
    ser = serial.Serial(com, 921600, timeout=1)
    ser.write(bytes.fromhex('41 54 2b 41 54 0d 0a'))
    return ser
def send_command(ser, can_id, cmd_mode, index, value):
    can_id = int(can_id)
    value = float(value)
    id_num, index_num = value_to_bytes(can_id), int_to_reversed_hex(index)
    value_num = float_to_bytes(value)
    date = f"41 54 {cmd_mode} 07 eb {id_num} {index_num} 00 00 {value_num} 0d 0a"
    #print(f"发送L91指令:{cmd_mode}至CAN{can_id}:索引{index},值{value}")

    try:
        ser.write(bytes.fromhex(date))
        response = ser.readline()
        hex_string = ' '.join([response[i:i+1].hex()
                              for i in range(len(response))])
        cmd_x, can_id_x, index_x, value_x = extract_info(
            hex_string.replace(" ", ""))
        cmd_int, can_id_int = bytes_to_text(cmd_x), bytes_to_value(can_id_x)
        index_int, value_int = reverse_hex_bytes(
            index_x), bytes_to_float(value_x)
        #print(f"接收内容《类型: {cmd_int} canid: {can_id_int} index: {index_int} value: {value_int}》RX: {hex_string}")
    except Exception as e:
        print(f"出现错误: {e}")

    return hex_string
def send_int(ser, date):
    ser.write(bytes.fromhex(date))
    response = ser.readline()
    hex_string = ' '.join([response[i:i+1].hex()
                          for i in range(len(response))])
    cmd_x, can_id_x, index_x, value_x = extract_info(
        hex_string.replace(" ", ""))
    cmd_int, can_id_int = bytes_to_text(cmd_x), bytes_to_value(can_id_x)
    index_int, value_int = reverse_hex_bytes(index_x), bytes_to_float(value_x)
    print(f"接收内容《类型: {cmd_int} canid: {can_id_int} index: {index_int} value: {value_int}》RX: {hex_string}")
    return hex_string
def JOG_san(ser, can_id,zd_type):#速度0.5模式正负转与停
    can_id = int(can_id)
    id_num = value_to_bytes(can_id)
    if zd_type==1:
        date=f"41 54 90 07 eb {id_num} 05 70 00 00 07 01 82 21 0d 0a"
    elif zd_type==0:
        date=f"41 54 90 07 eb {id_num} 05 70 00 00 07 00 7f ff 0d 0a"
    elif zd_type==-1:
        date=f"41 54 90 07 eb {id_num} 05 70 00 00 07 01 7d dd 0d 0a"
    ser.write(bytes.fromhex(date))
def ZERO_MECH(ser, can_id):#设置零点
    can_id = int(can_id)
    id_num = value_to_bytes(can_id)
    date=f"41 54 30 07 eb {id_num} 01 01 00 00 00 00 00 00 0d 0a"
    ser.write(bytes.fromhex(date))
def get_img(x=640, y=360):
    if not cap.isOpened():
        print("Error: Could not open camera.")
        frame = no_signal_image
    else:
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame. Returning 'NO SIGNAL' image.")
            frame = no_signal_image
        # 保持原始颜色空间,不进行转换
    scaled_frame = cv2.resize(frame, (x, y), interpolation=cv2.INTER_AREA)
    image_bytes = cv2.imencode('.jpg', scaled_frame)[1].tobytes()
    encoded_image = base64.b64encode(image_bytes).decode('utf-8')
    return encoded_image
class MyHandler(SimpleHTTPRequestHandler):
    def do_GET(self):
        global canid
        parsed_path = urlparse(self.path)
        query_params = parse_qs(parsed_path.query)
        if self.path == '/':
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.send_header("Access-Control-Allow-Origin", "*")
            self.end_headers()
            self.wfile.write(b"Hello, World!")
        elif self.path.startswith('/hello/'):
            name = self.path.split('/')[-1]
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.send_header("Access-Control-Allow-Origin", "*")
            self.end_headers()
            message = f"Hello, {name}!".encode('utf-8')
            self.wfile.write(message)
        elif parsed_path.path == '/greet':
            id = query_params.get('id', [''])[0]
            rad = query_params.get('rad', [''])[0]
            value = query_params.get('value', [''])[0]
            can_id=canid[id]
            #print('速度限制')
            send_command(ser, can_id, '90', 28695, rad)
            #print('角度限制')
            send_command(ser, can_id, '90', 28694, value)
            message = f'can_id{can_id},rad{rad},value{value}'
            html_content = f'''
            Image Page{message}'''
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.send_header("Access-Control-Allow-Origin", "*")
            self.end_headers()
            self.wfile.write(html_content.encode('utf-8'))
        elif parsed_path.path == '/wzms':
            id = query_params.get('id', [''])[0]
            can_id=canid[id]
            id_num=value_to_bytes(can_id)
            print('位置模式')
            send_int(ser, f'41 54 90 07 eb {id_num} 05 70 00 00 01 00 00 00 0d 0a')
            print('设置模式1')
            send_command(ser, can_id, '18', 28677, 1)
            
            html_content = '位置模式'
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.send_header("Access-Control-Allow-Origin", "*")
            self.end_headers()
            self.wfile.write(html_content.encode('utf-8'))
        elif parsed_path.path == '/res':
            id = query_params.get('id', [''])[0]
            can_id=canid[id]
            id_num=value_to_bytes(can_id)
            print('重置模式')
            html_content = '重置模式'
            send_int(ser, f'41 54 20 07 eb {id_num} 00 00 00 00 00 00 00 00 0d 0a')
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.send_header("Access-Control-Allow-Origin", "*")
            self.end_headers()
            self.wfile.write(html_content.encode('utf-8'))
        elif parsed_path.path == '/img':
            # HTML 内容包含  标签
            x = int(query_params.get('x', [''])[0])
            y = int(query_params.get('y', [''])[0])
            ca_img=get_img(x,y)
            html_content = ca_img
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.send_header("Access-Control-Allow-Origin", "*")
            self.end_headers()
            self.wfile.write(html_content.encode('utf-8'))
        else:
            self.send_response(404)
            self.end_headers()
            self.wfile.write(b"404 Not Found")
def run(server_class=HTTPServer, handler_class=MyHandler, port=8000):
    server_address = ('', port)
    httpd = server_class(server_address, handler_class)
    print(f"Starting httpd server on port {port}")
    httpd.serve_forever()
if __name__ == '__main__':
    no_signal_image = create_no_signal_image()
    ser = initialize_serial_port()
    monitoring_thread = threading.Thread(target=monitor_limit_switches)
    monitoring_thread.start()
    run()


技术微信号:Mayer-Huang