test_win32pipe.py
5.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import unittest
import time
import threading
from pywin32_testutil import str2bytes # py3k-friendly helper
import win32pipe
import win32file
import win32event
import pywintypes
import winerror
import win32con
class PipeTests(unittest.TestCase):
pipename = "\\\\.\\pipe\\python_test_pipe"
def _serverThread(self, pipe_handle, event, wait_time):
# just do one connection and terminate.
hr = win32pipe.ConnectNamedPipe(pipe_handle)
self.failUnless(hr in (0, winerror.ERROR_PIPE_CONNECTED), "Got error code 0x%x" % (hr,))
hr, got = win32file.ReadFile(pipe_handle, 100)
self.failUnlessEqual(got, str2bytes("foo\0bar"))
time.sleep(wait_time)
win32file.WriteFile(pipe_handle, str2bytes("bar\0foo"))
pipe_handle.Close()
event.set()
def startPipeServer(self, event, wait_time = 0):
openMode = win32pipe.PIPE_ACCESS_DUPLEX
pipeMode = win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_WAIT
sa = pywintypes.SECURITY_ATTRIBUTES()
sa.SetSecurityDescriptorDacl ( 1, None, 0 )
pipe_handle = win32pipe.CreateNamedPipe(self.pipename,
openMode,
pipeMode,
win32pipe.PIPE_UNLIMITED_INSTANCES,
0,
0,
2000,
sa)
threading.Thread(target=self._serverThread, args=(pipe_handle, event, wait_time)).start()
def testCallNamedPipe(self):
event = threading.Event()
self.startPipeServer(event)
got = win32pipe.CallNamedPipe(self.pipename,str2bytes("foo\0bar"), 1024, win32pipe.NMPWAIT_WAIT_FOREVER)
self.failUnlessEqual(got, str2bytes("bar\0foo"))
event.wait(5)
self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testTransactNamedPipeBlocking(self):
event = threading.Event()
self.startPipeServer(event)
open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
hpipe = win32file.CreateFile(self.pipename,
open_mode,
0, # no sharing
None, # default security
win32con.OPEN_EXISTING,
0, # win32con.FILE_FLAG_OVERLAPPED,
None)
# set to message mode.
win32pipe.SetNamedPipeHandleState(
hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), 1024, None)
self.failUnlessEqual(got, str2bytes("bar\0foo"))
event.wait(5)
self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testTransactNamedPipeBlockingBuffer(self):
# Like testTransactNamedPipeBlocking, but a pre-allocated buffer is
# passed (not really that useful, but it exercises the code path)
event = threading.Event()
self.startPipeServer(event)
open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
hpipe = win32file.CreateFile(self.pipename,
open_mode,
0, # no sharing
None, # default security
win32con.OPEN_EXISTING,
0, # win32con.FILE_FLAG_OVERLAPPED,
None)
# set to message mode.
win32pipe.SetNamedPipeHandleState(
hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
buffer = win32file.AllocateReadBuffer(1024)
hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, None)
self.failUnlessEqual(got, str2bytes("bar\0foo"))
event.wait(5)
self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testTransactNamedPipeAsync(self):
event = threading.Event()
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
self.startPipeServer(event, 0.5)
open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
hpipe = win32file.CreateFile(self.pipename,
open_mode,
0, # no sharing
None, # default security
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_OVERLAPPED,
None)
# set to message mode.
win32pipe.SetNamedPipeHandleState(
hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
buffer = win32file.AllocateReadBuffer(1024)
hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, overlapped)
self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING)
nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True)
got = buffer[:nbytes]
self.failUnlessEqual(got, str2bytes("bar\0foo"))
event.wait(5)
self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
if __name__ == '__main__':
unittest.main()