win32comport_demo.py
5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# This is a simple serial port terminal demo.
#
# Its primary purpose is to demonstrate the native serial port access offered via
# win32file.
# It uses 3 threads:
# - The main thread, which cranks up the other 2 threads, then simply waits for them to exit.
# - The user-input thread - blocks waiting for a keyboard character, and when found sends it
# out the COM port. If the character is Ctrl+C, it stops, signalling the COM port thread to stop.
# - The COM port thread is simply listening for input on the COM port, and prints it to the screen.
# This demo uses userlapped IO, so that none of the read or write operations actually block (however,
# in this sample, the very next thing we do _is_ block - so it shows off the concepts even though it
# doesnt exploit them.
from win32file import * # The base COM port and file IO functions.
from win32event import * # We use events and the WaitFor[Multiple]Objects functions.
import win32con # constants.
import msvcrt # For the getch() function.
import threading
import sys
def FindModem():
# Snoop over the comports, seeing if it is likely we have a modem.
for i in range(1,5):
port = "COM%d" % (i,)
try:
handle = CreateFile(port,
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
0, # exclusive access
None, # no security
win32con.OPEN_EXISTING,
win32con.FILE_ATTRIBUTE_NORMAL,
None)
# It appears that an available COM port will always success here,
# just return 0 for the status flags. We only care that it has _any_ status
# flags (and therefore probably a real modem)
if GetCommModemStatus(handle) != 0:
return port
except error:
pass # No port, or modem status failed.
return None
# A basic synchronous COM port file-like object
class SerialTTY:
def __init__(self, port):
if type(port)==type(0):
port = "COM%d" % (port,)
self.handle = CreateFile(port,
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
0, # exclusive access
None, # no security
win32con.OPEN_EXISTING,
win32con.FILE_ATTRIBUTE_NORMAL | win32con.FILE_FLAG_OVERLAPPED,
None)
# Tell the port we want a notification on each char.
SetCommMask(self.handle, EV_RXCHAR)
# Setup a 4k buffer
SetupComm(self.handle, 4096, 4096)
# Remove anything that was there
PurgeComm(self.handle, PURGE_TXABORT | PURGE_RXABORT | PURGE_TXCLEAR | PURGE_RXCLEAR )
# Setup for overlapped IO.
timeouts = 0xFFFFFFFF, 0, 1000, 0, 1000
SetCommTimeouts(self.handle, timeouts)
# Setup the connection info.
dcb = GetCommState( self.handle )
dcb.BaudRate = CBR_115200
dcb.ByteSize = 8
dcb.Parity = NOPARITY
dcb.StopBits = ONESTOPBIT
SetCommState(self.handle, dcb)
print("Connected to %s at %s baud" % (port, dcb.BaudRate))
def _UserInputReaderThread(self):
overlapped = OVERLAPPED()
overlapped.hEvent = CreateEvent(None, 1, 0, None)
try:
while 1:
ch = msvcrt.getch()
if ord(ch)==3:
break
WriteFile(self.handle, ch, overlapped)
# Wait for the write to complete.
WaitForSingleObject(overlapped.hEvent, INFINITE)
finally:
SetEvent(self.eventStop)
def _ComPortThread(self):
overlapped = OVERLAPPED()
overlapped.hEvent = CreateEvent(None, 1, 0, None)
while 1:
# XXX - note we could _probably_ just use overlapped IO on the win32file.ReadFile() statement
# XXX but this tests the COM stuff!
rc, mask = WaitCommEvent(self.handle, overlapped)
if rc == 0: # Character already ready!
SetEvent(overlapped.hEvent)
rc = WaitForMultipleObjects([overlapped.hEvent, self.eventStop], 0, INFINITE)
if rc == WAIT_OBJECT_0:
# Some input - read and print it
flags, comstat = ClearCommError( self.handle )
rc, data = ReadFile(self.handle, comstat.cbInQue, overlapped)
WaitForSingleObject(overlapped.hEvent, INFINITE)
sys.stdout.write(data)
else:
# Stop the thread!
# Just incase the user input thread uis still going, close it
sys.stdout.close()
break
def Run(self):
self.eventStop = CreateEvent(None, 0, 0, None)
# Start the reader and writer threads.
user_thread = threading.Thread(target = self._UserInputReaderThread)
user_thread.start()
com_thread = threading.Thread(target = self._ComPortThread)
com_thread.start()
user_thread.join()
com_thread.join()
if __name__=='__main__':
print("Serial port terminal demo - press Ctrl+C to exit")
if len(sys.argv)<=1:
port = FindModem()
if port is None:
print("No COM port specified, and no modem could be found")
print("Please re-run this script with the name of a COM port (eg COM3)")
sys.exit(1)
else:
port = sys.argv[1]
tty = SerialTTY(port)
tty.Run()