monitor.py
14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
# Copyright 2014-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Class to monitor a MongoDB server on a background thread."""
import atexit
import threading
import weakref
from pymongo import common, periodic_executor
from pymongo.errors import (NotMasterError,
OperationFailure,
_OperationCancelled)
from pymongo.ismaster import IsMaster
from pymongo.monotonic import time as _time
from pymongo.periodic_executor import _shutdown_executors
from pymongo.read_preferences import MovingAverage
from pymongo.server_description import ServerDescription
from pymongo.srv_resolver import _SrvResolver
class MonitorBase(object):
def __init__(self, topology, name, interval, min_interval):
"""Base class to do periodic work on a background thread.
The the background thread is signaled to stop when the Topology or
this instance is freed.
"""
# We strongly reference the executor and it weakly references us via
# this closure. When the monitor is freed, stop the executor soon.
def target():
monitor = self_ref()
if monitor is None:
return False # Stop the executor.
monitor._run()
return True
executor = periodic_executor.PeriodicExecutor(
interval=interval,
min_interval=min_interval,
target=target,
name=name)
self._executor = executor
def _on_topology_gc(dummy=None):
# This prevents GC from waiting 10 seconds for isMaster to complete
# See test_cleanup_executors_on_client_del.
monitor = self_ref()
if monitor:
monitor.gc_safe_close()
# Avoid cycles. When self or topology is freed, stop executor soon.
self_ref = weakref.ref(self, executor.close)
self._topology = weakref.proxy(topology, _on_topology_gc)
_register(self)
def open(self):
"""Start monitoring, or restart after a fork.
Multiple calls have no effect.
"""
self._executor.open()
def gc_safe_close(self):
"""GC safe close."""
self._executor.close()
def close(self):
"""Close and stop monitoring.
open() restarts the monitor after closing.
"""
self.gc_safe_close()
def join(self, timeout=None):
"""Wait for the monitor to stop."""
self._executor.join(timeout)
def request_check(self):
"""If the monitor is sleeping, wake it soon."""
self._executor.wake()
class Monitor(MonitorBase):
def __init__(
self,
server_description,
topology,
pool,
topology_settings):
"""Class to monitor a MongoDB server on a background thread.
Pass an initial ServerDescription, a Topology, a Pool, and
TopologySettings.
The Topology is weakly referenced. The Pool must be exclusive to this
Monitor.
"""
super(Monitor, self).__init__(
topology,
"pymongo_server_monitor_thread",
topology_settings.heartbeat_frequency,
common.MIN_HEARTBEAT_INTERVAL)
self._server_description = server_description
self._pool = pool
self._settings = topology_settings
self._listeners = self._settings._pool_options.event_listeners
pub = self._listeners is not None
self._publish = pub and self._listeners.enabled_for_server_heartbeat
self._cancel_context = None
self._rtt_monitor = _RttMonitor(
topology, topology_settings, topology._create_pool_for_monitor(
server_description.address))
self.heartbeater = None
def cancel_check(self):
"""Cancel any concurrent isMaster check.
Note: this is called from a weakref.proxy callback and MUST NOT take
any locks.
"""
context = self._cancel_context
if context:
# Note: we cannot close the socket because doing so may cause
# concurrent reads/writes to hang until a timeout occurs
# (depending on the platform).
context.cancel()
def _start_rtt_monitor(self):
"""Start an _RttMonitor that periodically runs ping."""
# If this monitor is closed directly before (or during) this open()
# call, the _RttMonitor will not be closed. Checking if this monitor
# was closed directly after resolves the race.
self._rtt_monitor.open()
if self._executor._stopped:
self._rtt_monitor.close()
def gc_safe_close(self):
self._executor.close()
self._rtt_monitor.gc_safe_close()
self.cancel_check()
def close(self):
self.gc_safe_close()
self._rtt_monitor.close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
self._reset_connection()
def _reset_connection(self):
# Clear our pooled connection.
self._pool.reset()
def _run(self):
try:
prev_sd = self._server_description
try:
self._server_description = self._check_server()
except _OperationCancelled as exc:
# Already closed the connection, wait for the next check.
self._server_description = ServerDescription(
self._server_description.address, error=exc)
if prev_sd.is_server_type_known:
# Immediately retry since we've already waited 500ms to
# discover that we've been cancelled.
self._executor.skip_sleep()
return
# Update the Topology and clear the server pool on error.
self._topology.on_change(self._server_description,
reset_pool=self._server_description.error)
if (self._server_description.is_server_type_known and
self._server_description.topology_version):
self._start_rtt_monitor()
# Immediately check for the next streaming response.
self._executor.skip_sleep()
if self._server_description.error and prev_sd.is_server_type_known:
# Immediately retry on network errors.
self._executor.skip_sleep()
except ReferenceError:
# Topology was garbage-collected.
self.close()
def _check_server(self):
"""Call isMaster or read the next streaming response.
Returns a ServerDescription.
"""
start = _time()
try:
try:
return self._check_once()
except (OperationFailure, NotMasterError) as exc:
# Update max cluster time even when isMaster fails.
self._topology.receive_cluster_time(
exc.details.get('$clusterTime'))
raise
except ReferenceError:
raise
except Exception as error:
sd = self._server_description
address = sd.address
duration = _time() - start
if self._publish:
awaited = sd.is_server_type_known and sd.topology_version
self._listeners.publish_server_heartbeat_failed(
address, duration, error, awaited)
self._reset_connection()
if isinstance(error, _OperationCancelled):
raise
self._rtt_monitor.reset()
# Server type defaults to Unknown.
return ServerDescription(address, error=error)
def _check_once(self):
"""A single attempt to call ismaster.
Returns a ServerDescription, or raises an exception.
"""
address = self._server_description.address
if self._publish:
self._listeners.publish_server_heartbeat_started(address)
if self._cancel_context and self._cancel_context.cancelled:
self._reset_connection()
with self._pool.get_socket({}) as sock_info:
self._cancel_context = sock_info.cancel_context
response, round_trip_time = self._check_with_socket(sock_info)
if not response.awaitable:
self._rtt_monitor.add_sample(round_trip_time)
sd = ServerDescription(address, response,
self._rtt_monitor.average())
if self._publish:
self._listeners.publish_server_heartbeat_succeeded(
address, round_trip_time, response, response.awaitable)
return sd
def _check_with_socket(self, conn):
"""Return (IsMaster, round_trip_time).
Can raise ConnectionFailure or OperationFailure.
"""
cluster_time = self._topology.max_cluster_time()
start = _time()
if conn.more_to_come:
# Read the next streaming isMaster (MongoDB 4.4+).
response = IsMaster(conn._next_reply(), awaitable=True)
elif (conn.performed_handshake and
self._server_description.topology_version):
# Initiate streaming isMaster (MongoDB 4.4+).
response = conn._ismaster(
cluster_time,
self._server_description.topology_version,
self._settings.heartbeat_frequency,
None)
else:
# New connection handshake or polling isMaster (MongoDB <4.4).
response = conn._ismaster(cluster_time, None, None, None)
return response, _time() - start
class SrvMonitor(MonitorBase):
def __init__(self, topology, topology_settings):
"""Class to poll SRV records on a background thread.
Pass a Topology and a TopologySettings.
The Topology is weakly referenced.
"""
super(SrvMonitor, self).__init__(
topology,
"pymongo_srv_polling_thread",
common.MIN_SRV_RESCAN_INTERVAL,
topology_settings.heartbeat_frequency)
self._settings = topology_settings
self._seedlist = self._settings._seeds
self._fqdn = self._settings.fqdn
def _run(self):
seedlist = self._get_seedlist()
if seedlist:
self._seedlist = seedlist
try:
self._topology.on_srv_update(self._seedlist)
except ReferenceError:
# Topology was garbage-collected.
self.close()
def _get_seedlist(self):
"""Poll SRV records for a seedlist.
Returns a list of ServerDescriptions.
"""
try:
seedlist, ttl = _SrvResolver(self._fqdn).get_hosts_and_min_ttl()
if len(seedlist) == 0:
# As per the spec: this should be treated as a failure.
raise Exception
except Exception:
# As per the spec, upon encountering an error:
# - An error must not be raised
# - SRV records must be rescanned every heartbeatFrequencyMS
# - Topology must be left unchanged
self.request_check()
return None
else:
self._executor.update_interval(
max(ttl, common.MIN_SRV_RESCAN_INTERVAL))
return seedlist
class _RttMonitor(MonitorBase):
def __init__(self, topology, topology_settings, pool):
"""Maintain round trip times for a server.
The Topology is weakly referenced.
"""
super(_RttMonitor, self).__init__(
topology,
"pymongo_server_rtt_thread",
topology_settings.heartbeat_frequency,
common.MIN_HEARTBEAT_INTERVAL)
self._pool = pool
self._moving_average = MovingAverage()
self._lock = threading.Lock()
def close(self):
self.gc_safe_close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
self._pool.reset()
def add_sample(self, sample):
"""Add a RTT sample."""
with self._lock:
self._moving_average.add_sample(sample)
def average(self):
"""Get the calculated average, or None if no samples yet."""
with self._lock:
return self._moving_average.get()
def reset(self):
"""Reset the average RTT."""
with self._lock:
return self._moving_average.reset()
def _run(self):
try:
# NOTE: This thread is only run when when using the streaming
# heartbeat protocol (MongoDB 4.4+).
# XXX: Skip check if the server is unknown?
rtt = self._ping()
self.add_sample(rtt)
except ReferenceError:
# Topology was garbage-collected.
self.close()
except Exception:
self._pool.reset()
def _ping(self):
"""Run an "isMaster" command and return the RTT."""
with self._pool.get_socket({}) as sock_info:
if self._executor._stopped:
raise Exception('_RttMonitor closed')
start = _time()
sock_info.ismaster()
return _time() - start
# Close monitors to cancel any in progress streaming checks before joining
# executor threads. For an explanation of how this works see the comment
# about _EXECUTORS in periodic_executor.py.
_MONITORS = set()
def _register(monitor):
ref = weakref.ref(monitor, _unregister)
_MONITORS.add(ref)
def _unregister(monitor_ref):
_MONITORS.remove(monitor_ref)
def _shutdown_monitors():
if _MONITORS is None:
return
# Copy the set. Closing monitors removes them.
monitors = list(_MONITORS)
# Close all monitors.
for ref in monitors:
monitor = ref()
if monitor:
monitor.gc_safe_close()
monitor = None
def _shutdown_resources():
# _shutdown_monitors/_shutdown_executors may already be GC'd at shutdown.
shutdown = _shutdown_monitors
if shutdown:
shutdown()
shutdown = _shutdown_executors
if shutdown:
shutdown()
atexit.register(_shutdown_resources)