利用pyserial实现读卡器开发

主要通过蓝牙协议,通过 pyserial 这个python模块来实现蓝牙数据的收发。

读卡器设备为华视电子的CVR-100B这款身份证读卡器。官网:http://www.chinaidcard.com/archives/72/

首先把蓝牙连接上,然后找到蓝牙对应的设备名称:

ls /dev/tty*

/dev/tty.CVR-100B-SPPDev
/dev/tty                         
/dev/tty.Bluetooth-Incoming-Port
/dev/tty.CVR-100B-SPPDev 

找到设备号为 `/dev/tty.CVR-100B-SPPDev `, 这个一会代码里面连接会用到。

然后直接连接:

   def __init__(self, port_name=None):
        self.ser = serial.Serial()
        self.ser.baudrate = self.baudrate
        if port_name:
            self.port_name = port_name
        self.ser.port = self.port_name
        self.ser.timeout = 1

向蓝牙接口写命令数据:

cmd_request = [0xAA, 0xAA, 0xAA, 0x96, 0x69, 0x00, 0x03, 0x20, 0x01, 0x22]
self.ser.open()
self.ser.write(serial.to_bytes(self.cmd_request))

读取数据的方法:

    def serial_read(self):
        bytes_data = self.ser.read(1)
        while True:
            bytes_data += self.ser.read(self.ser.in_waiting)
            time.sleep(0.1)
            if self.ser.in_waiting<=0:
                break
        return bytes_data

根据官方的例子,需要先发送两止指令,才能读取卡的数据:

    cmd_request = [0xAA, 0xAA, 0xAA, 0x96, 0x69, 0x00, 0x03, 0x20, 0x01, 0x22]
    cmd_select = [0xAA, 0xAA, 0xAA, 0x96, 0x69, 0x00, 0x03, 0x20, 0x02, 0x21]
    cmd_read = [0xAA, 0xAA, 0xAA, 0x96, 0x69, 0x00, 0x03, 0x30, 0x01, 0x32]
    cmd_sam = [0xAA, 0xAA, 0xAA, 0x96, 0x69, 0x00, 0x03, 0x12, 0xFF, 0xEE]

    def read(self):
        info = None
        try:
            self.ser.open()
            self.ser.write(serial.to_bytes(self.cmd_request))
            buffer1 = self.serial_read()

            self.ser.write(serial.to_bytes(self.cmd_select))
            buffer2 = self.serial_read()

            self.ser.write(serial.to_bytes(self.cmd_read))
            buffer0 = self.serial_read()

            info = self.decode_info(buffer0)
        except Exception as e:
            gen_log.error(e)
        finally:
            self.ser.close()

        return info

decode_info 方法就是把读取到的数据,按照协议解析成文本。

这一步有一个编码的坑,一直按照utf-8的思路去 encode, decode,都不行。

最后才发现编码为 utf-16。

    info_pos_map = {
        'name': [14, 30], # 开始位置,长度
        'sex': [44, 2],
        'nation': [46, 4],
        'birthday': [50, 16],
        'address': [66, 70],
        'idcode': [136, 36],
        'department': [176, 30],
        'sdate': [202, 16],
        'tdate': [218, 16],
        # 'photo': [270, 1024],
    }

    def decode_info(self, data):
        info = {}
        for name, pos in self.info_pos_map.items():
            if name!='photo':
                info[name] = data[pos[0]:sum(pos)].decode('utf-16').strip()
            else:
                info[name] = data[pos[0]:sum(pos)]
        info['nation'] = self._decode_nation(info['nation'])
        info['sex'] = self._decode_sex(info['sex'])
        return info

完整代码

#coding=utf-8

import serial
import time

from axmservice.lib.log import gen_log

nation_code_map = {
    "01": u"汉",
    "02": u"蒙古",
    "03": u"回",
    "04": u"藏",
    "05": u"维吾尔",
    "06": u"苗",
    "07": u"彝",
    "08": u"壮",
    "09": u"布依",
    "10": u"朝鲜",
    "11": u"满",
    "12": u"侗",
    "13": u"瑶",
    "14": u"白",
    "15": u"土家",
    "16": u"哈尼",
    "17": u"哈萨克",
    "18": u"傣",
    "19": u"黎",
    "20": u"傈僳",
    "21": u"佤",
    "22": u"畲",
    "23": u"高山",
    "24": u"拉祜",
    "25": u"水",
    "26": u"东乡",
    "27": u"纳西",
    "28": u"景颇",
    "29": u"柯尔克孜",
    "30": u"土",
    "31": u"达斡尔",
    "32": u"仫佬",
    "33": u"羌",
    "34": u"布朗",
    "35": u"撒拉",
    "36": u"毛南",
    "37": u"仡佬",
    "38": u"锡伯",
    "39": u"阿昌",
    "40": u"普米",
    "41": u"塔吉克",
    "42": u"怒",
    "43": u"乌孜别克",
    "44": u"俄罗斯",
    "45": u"鄂温克",
    "46": u"德昂",
    "47": u"保安",
    "48": u"裕固",
    "49": u"京",
    "50": u"塔塔尔",
    "51": u"独龙",
    "52": u"鄂伦春",
    "53": u"赫哲",
    "54": u"门巴",
    "55": u"珞巴",
    "56": u"基诺",
}

class IDCardReaderCVR100U(object):
    '''华视身份证读卡器, 型号 CVR-100B
    先通过蓝牙连接上读卡器, 然后  ls /dev/tty* 找到对应的串口名称, 实例化的时候传递进来
    '''

    baudrate = 9600
    port_name = '/dev/tty.CVR-100B-SPPDev'
    cmd_request = [0xAA, 0xAA, 0xAA, 0x96, 0x69, 0x00, 0x03, 0x20, 0x01, 0x22]
    cmd_select = [0xAA, 0xAA, 0xAA, 0x96, 0x69, 0x00, 0x03, 0x20, 0x02, 0x21]
    cmd_read = [0xAA, 0xAA, 0xAA, 0x96, 0x69, 0x00, 0x03, 0x30, 0x01, 0x32]
    cmd_sam = [0xAA, 0xAA, 0xAA, 0x96, 0x69, 0x00, 0x03, 0x12, 0xFF, 0xEE]

    info_pos_map = {
        'name': [14, 30], # 开始位置,长度
        'sex': [44, 2],
        'nation': [46, 4],
        'birthday': [50, 16],
        'address': [66, 70],
        'idcode': [136, 36],
        'department': [176, 30],
        'sdate': [202, 16],
        'tdate': [218, 16],
        # 'photo': [270, 1024],
    }

    def __init__(self, port_name=None):
        self.ser = serial.Serial()
        self.ser.baudrate = self.baudrate
        if port_name:
            self.port_name = port_name
        self.ser.port = self.port_name
        self.ser.timeout = 1

    def decode_info(self, data):
        info = {}
        for name, pos in self.info_pos_map.items():
            if name!='photo':
                info[name] = data[pos[0]:sum(pos)].decode('utf-16').strip()
            else:
                info[name] = data[pos[0]:sum(pos)]
        info['nation'] = self._decode_nation(info['nation'])
        info['sex'] = self._decode_sex(info['sex'])
        return info

    def _decode_sex(self, data):
        if data == '1':
            return u'男'
        elif data == '2':
            return u'女'
        elif data == '9':
            return u'其它'
        else: return ''

    def _decode_nation(self, data):
        return nation_code_map.get(data, '')

    def serial_read(self):
        bytes_data = self.ser.read(1)
        while True:
            bytes_data += self.ser.read(self.ser.in_waiting)
            time.sleep(0.1)
            if self.ser.in_waiting<=0:
                break
        return bytes_data

    def read(self):
        info = None
        try:
            self.ser.open()
            self.ser.write(serial.to_bytes(self.cmd_request))
            buffer1 = self.serial_read()

            self.ser.write(serial.to_bytes(self.cmd_select))
            buffer2 = self.serial_read()

            self.ser.write(serial.to_bytes(self.cmd_read))
            buffer0 = self.serial_read()

            info = self.decode_info(buffer0)
        except Exception as e:
            gen_log.error(e)
        finally:
            self.ser.close()

        return info