defMTest.py 7.4 KB
# Hand Tracing Module
import cv2
import mediapipe as mp
import time
import base64    
import requests
import json
import configparser
import ctypes
import sys
#from PIL import Image
#import io
#import numpy as np

class handDetector:
    def __init__(self, mode=False, maxHands=2, detectionCon=0.5, trackCon=0.5):
        self.mode = mode
        self.maxHands = maxHands
        self.detectionCon = detectionCon
        self.trackCon = trackCon
        self.mpHands = mp.solutions.hands
       # self.hands = self.mpHands.Hands(self.mode, self.maxHands,self.detectionCon, self.trackCon)
        self.hands = self.mpHands.Hands()
        self.mpDraw = mp.solutions.drawing_utils
    def findHands(self, img, draw=True):
        imgRGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        self.results = self.hands.process(imgRGB)
        # print(results.multi_hand_landmarks)
        if self.results.multi_hand_landmarks:
            for handLms in self.results.multi_hand_landmarks:
                if draw:
                    self.mpDraw.draw_landmarks(img, handLms,
                                               self.mpHands.HAND_CONNECTIONS)
        return img
    def findPosition(self, img, handNo=-1, draw=True):
        lmList = []
        if(handNo == -1):
             if self.results.multi_hand_landmarks:
                # myHand = self.results.multi_hand_landmarks[handNo]
                for myHand in self.results.multi_hand_landmarks:
                    for id, lm in enumerate(myHand.landmark):
                        # print(id, lm)
                        h, w, c = img.shape
                        cx, cy = int(lm.x * w), int(lm.y * h)
                        # print(id, cx, cy)
                        lmList.append([id, cx, cy])
                        if draw:
                            cv2.circle(img, (cx, cy), 8, (255, 0, 255), cv2.FILLED)
        else:
            if self.results.multi_hand_landmarks:
                myHand = self.results.multi_hand_landmarks[handNo]
                for id, lm in enumerate(myHand.landmark):
                    # print(id, lm)
                    h, w, c = img.shape
                    cx, cy = int(lm.x * w), int(lm.y * h)
                    # print(id, cx, cy)
                    lmList.append([id, cx, cy])
                    if draw:
                        cv2.circle(img, (cx, cy), 8, (255, 0, 255), cv2.FILLED)
        return lmList
    def image_to_base64(self,img):
             # 将图像转换为Base64字符串
            _, img_encoded = cv2.imencode('.jpg', img)
            base64_data = base64.b64encode(img_encoded)
            return base64_data.decode("utf-8")
    
    def base64_to_image(self, base64_string, save_path):
        try: 
            # 解码Base64字符串为字节数据
            image_data = base64.b64decode(base64_string)
            # 将字节数据转换为numpy数组
            np_array = np.frombuffer(image_data, np.uint8)
            # 解码numpy数组为图像
            image = cv2.imdecode(np_array, cv2.IMREAD_COLOR)            
            # 保存图像到指定路径
            cv2.imwrite(save_path, image)
            print(f"Image saved successfully at {save_path}")
            return image
        except Exception as e:
            print(f"An error occurred during Base64 to image conversion: {str(e)}")
            return None
        #     image_data = base64.b64decode(base64_string)
        #     image = Image.open(io.BytesIO(image_data))
        #     image.save(save_path)
        #     print(f"Image saved successfully at {save_path}")
        # except Exception as e:
        #     print(f"An error occurred during Base64 to image conversion: {str(e)}")
    def save_input_to_txt(self,input_str, file_path):
        try:
            with open(file_path, 'w', encoding='utf-8') as file:
                file.write(input_str)
            print("数据已成功保存到txt文件中。")
        except Exception as e:
            print(f"保存数据到txt文件时出错:{str(e)}")
    def post_string_to_http_api(self,api_url,input_str, points):
        try:
            headers = {'Content-Type': 'application/json'}
            json_str = json.dumps(points)
            length=len(points)
            data = {'input': input_str,"points":json_str,"pointLen":length}
              
            # print(input_str)
            response = requests.post(api_url, headers=headers, data=json.dumps(data))

            if response.status_code == 200:
                # 可根据需要处理响应结果
                print(f"Request ok:{response.status_code}")
                # response_data = response.json()
                # return response_data
                return response.text
            else:
                # 处理请求错误
                print(f"Request failed with status code {response.status_code}")
                return None
        except requests.exceptions.RequestException as e:
            # 处理连接错误等请求异常
            print(f"An error occurred during the request: {str(e)}")
            return None
        except Exception as e:
            # 处理其他异常
            print(f"An error occurred: {str(e)}")
            return None
    def readConfig(self):
        try:
            # 创建一个 ConfigParser 对象
            config = configparser.ConfigParser()

            # 读取配置文件
            config.read('config.ini')

            # 获取相机索引号和配置的接口地址
            camera_index = int(config.get('Camera', 'index'))
            api_url = config.get('API', 'url')
            # 打印获取到的值
            print(f"Camera Index: {camera_index}")
            print(f"API URL: {api_url}")
            return [camera_index, api_url]
        except Exception as e:
            # 处理其他异常
            print(f"readconfig error: {str(e)}")
            return [0,"http://localhost:8765"]
def is_admin():
    try:
        return ctypes.windll.shell32.IsUserAnAdmin()
    except:
        return False
def main():
    if(is_admin()==False): 
        print("not admin , need run as admin")
        ctypes.windll.shell32.ShellExecuteW(None,"runas", sys.executable, __file__, None, 1)
        return
    input_str=""
    # api_url="http://localhost:8765/rest/api/v1/imgUpload"
    # camera_index=0
    detector = handDetector()
    configResult=detector.readConfig()
    camera_index=configResult[0]
    api_url=configResult[1]+"/rest/api/v1/imgUpload"
    # pTime = 0
    # cTime = 0
    cap = cv2.VideoCapture(camera_index)
    detector.__init__(False,2,0.5,0.5)
    while True:
        success, img = cap.read()
        if(success):
            img = detector.findHands(img)
            lmList = detector.findPosition(img)
            # if len(lmList) != 0:
            #     print(lmList[4])
            # cTime = time.time()
            # fps = 1 / (cTime - pTime)
            # pTime = cTime
            # cv2.imwrite("1.jpg",img)
            # cv2.putText(img, str(int(fps)), (10, 70), cv2.FONT_HERSHEY_PLAIN, 3,(255, 0, 255), 3)
            input_str=detector.image_to_base64(img)
            # detector.save_input_to_txt(input_str, "input.txt")
            # detector.base64_to_image(input_str,"input.jpg")
            detector.post_string_to_http_api(api_url,input_str,lmList)
            # cv2.imshow("Image", img)
            # cv2.waitKey(1)
        else:
            print("camera read error")
            break
        time.sleep(0.2)

if __name__ == "__main__":
    main()