topology_description.py
22.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
# Copyright 2014-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Represent a deployment of MongoDB servers."""
from collections import namedtuple
from pymongo import common
from pymongo.errors import ConfigurationError
from pymongo.read_preferences import ReadPreference
from pymongo.server_description import ServerDescription
from pymongo.server_selectors import Selection
from pymongo.server_type import SERVER_TYPE
# Enumeration for various kinds of MongoDB cluster topologies.
TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary',
'ReplicaSetWithPrimary', 'Sharded',
'Unknown'])(*range(5))
# Topologies compatible with SRV record polling.
SRV_POLLING_TOPOLOGIES = (TOPOLOGY_TYPE.Unknown, TOPOLOGY_TYPE.Sharded)
class TopologyDescription(object):
def __init__(self,
topology_type,
server_descriptions,
replica_set_name,
max_set_version,
max_election_id,
topology_settings):
"""Representation of a deployment of MongoDB servers.
:Parameters:
- `topology_type`: initial type
- `server_descriptions`: dict of (address, ServerDescription) for
all seeds
- `replica_set_name`: replica set name or None
- `max_set_version`: greatest setVersion seen from a primary, or None
- `max_election_id`: greatest electionId seen from a primary, or None
- `topology_settings`: a TopologySettings
"""
self._topology_type = topology_type
self._replica_set_name = replica_set_name
self._server_descriptions = server_descriptions
self._max_set_version = max_set_version
self._max_election_id = max_election_id
# The heartbeat_frequency is used in staleness estimates.
self._topology_settings = topology_settings
# Is PyMongo compatible with all servers' wire protocols?
self._incompatible_err = None
for s in self._server_descriptions.values():
if not s.is_server_type_known:
continue
# s.min/max_wire_version is the server's wire protocol.
# MIN/MAX_SUPPORTED_WIRE_VERSION is what PyMongo supports.
server_too_new = (
# Server too new.
s.min_wire_version is not None
and s.min_wire_version > common.MAX_SUPPORTED_WIRE_VERSION)
server_too_old = (
# Server too old.
s.max_wire_version is not None
and s.max_wire_version < common.MIN_SUPPORTED_WIRE_VERSION)
if server_too_new:
self._incompatible_err = (
"Server at %s:%d requires wire version %d, but this "
"version of PyMongo only supports up to %d."
% (s.address[0], s.address[1],
s.min_wire_version, common.MAX_SUPPORTED_WIRE_VERSION))
elif server_too_old:
self._incompatible_err = (
"Server at %s:%d reports wire version %d, but this "
"version of PyMongo requires at least %d (MongoDB %s)."
% (s.address[0], s.address[1],
s.max_wire_version,
common.MIN_SUPPORTED_WIRE_VERSION,
common.MIN_SUPPORTED_SERVER_VERSION))
break
# Server Discovery And Monitoring Spec: Whenever a client updates the
# TopologyDescription from an ismaster response, it MUST set
# TopologyDescription.logicalSessionTimeoutMinutes to the smallest
# logicalSessionTimeoutMinutes value among ServerDescriptions of all
# data-bearing server types. If any have a null
# logicalSessionTimeoutMinutes, then
# TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null.
readable_servers = self.readable_servers
if not readable_servers:
self._ls_timeout_minutes = None
elif any(s.logical_session_timeout_minutes is None
for s in readable_servers):
self._ls_timeout_minutes = None
else:
self._ls_timeout_minutes = min(s.logical_session_timeout_minutes
for s in readable_servers)
def check_compatible(self):
"""Raise ConfigurationError if any server is incompatible.
A server is incompatible if its wire protocol version range does not
overlap with PyMongo's.
"""
if self._incompatible_err:
raise ConfigurationError(self._incompatible_err)
def has_server(self, address):
return address in self._server_descriptions
def reset_server(self, address):
"""A copy of this description, with one server marked Unknown."""
unknown_sd = self._server_descriptions[address].to_unknown()
return updated_topology_description(self, unknown_sd)
def reset(self):
"""A copy of this description, with all servers marked Unknown."""
if self._topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
else:
topology_type = self._topology_type
# The default ServerDescription's type is Unknown.
sds = dict((address, ServerDescription(address))
for address in self._server_descriptions)
return TopologyDescription(
topology_type,
sds,
self._replica_set_name,
self._max_set_version,
self._max_election_id,
self._topology_settings)
def server_descriptions(self):
"""Dict of (address,
:class:`~pymongo.server_description.ServerDescription`)."""
return self._server_descriptions.copy()
@property
def topology_type(self):
"""The type of this topology."""
return self._topology_type
@property
def topology_type_name(self):
"""The topology type as a human readable string.
.. versionadded:: 3.4
"""
return TOPOLOGY_TYPE._fields[self._topology_type]
@property
def replica_set_name(self):
"""The replica set name."""
return self._replica_set_name
@property
def max_set_version(self):
"""Greatest setVersion seen from a primary, or None."""
return self._max_set_version
@property
def max_election_id(self):
"""Greatest electionId seen from a primary, or None."""
return self._max_election_id
@property
def logical_session_timeout_minutes(self):
"""Minimum logical session timeout, or None."""
return self._ls_timeout_minutes
@property
def known_servers(self):
"""List of Servers of types besides Unknown."""
return [s for s in self._server_descriptions.values()
if s.is_server_type_known]
@property
def has_known_servers(self):
"""Whether there are any Servers of types besides Unknown."""
return any(s for s in self._server_descriptions.values()
if s.is_server_type_known)
@property
def readable_servers(self):
"""List of readable Servers."""
return [s for s in self._server_descriptions.values() if s.is_readable]
@property
def common_wire_version(self):
"""Minimum of all servers' max wire versions, or None."""
servers = self.known_servers
if servers:
return min(s.max_wire_version for s in self.known_servers)
return None
@property
def heartbeat_frequency(self):
return self._topology_settings.heartbeat_frequency
def apply_selector(self, selector, address, custom_selector=None):
def apply_local_threshold(selection):
if not selection:
return []
settings = self._topology_settings
# Round trip time in seconds.
fastest = min(
s.round_trip_time for s in selection.server_descriptions)
threshold = settings.local_threshold_ms / 1000.0
return [s for s in selection.server_descriptions
if (s.round_trip_time - fastest) <= threshold]
if getattr(selector, 'min_wire_version', 0):
common_wv = self.common_wire_version
if common_wv and common_wv < selector.min_wire_version:
raise ConfigurationError(
"%s requires min wire version %d, but topology's min"
" wire version is %d" % (selector,
selector.min_wire_version,
common_wv))
if self.topology_type == TOPOLOGY_TYPE.Single:
# Ignore selectors for standalone.
return self.known_servers
elif address:
# Ignore selectors when explicit address is requested.
description = self.server_descriptions().get(address)
return [description] if description else []
elif self.topology_type == TOPOLOGY_TYPE.Sharded:
# Ignore read preference.
selection = Selection.from_topology_description(self)
else:
selection = selector(Selection.from_topology_description(self))
# Apply custom selector followed by localThresholdMS.
if custom_selector is not None and selection:
selection = selection.with_server_descriptions(
custom_selector(selection.server_descriptions))
return apply_local_threshold(selection)
def has_readable_server(self, read_preference=ReadPreference.PRIMARY):
"""Does this topology have any readable servers available matching the
given read preference?
:Parameters:
- `read_preference`: an instance of a read preference from
:mod:`~pymongo.read_preferences`. Defaults to
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`.
.. note:: When connected directly to a single server this method
always returns ``True``.
.. versionadded:: 3.4
"""
common.validate_read_preference("read_preference", read_preference)
return any(self.apply_selector(read_preference, None))
def has_writable_server(self):
"""Does this topology have a writable server available?
.. note:: When connected directly to a single server this method
always returns ``True``.
.. versionadded:: 3.4
"""
return self.has_readable_server(ReadPreference.PRIMARY)
def __repr__(self):
# Sort the servers by address.
servers = sorted(self._server_descriptions.values(),
key=lambda sd: sd.address)
return "<%s id: %s, topology_type: %s, servers: %r>" % (
self.__class__.__name__, self._topology_settings._topology_id,
self.topology_type_name, servers)
# If topology type is Unknown and we receive an ismaster response, what should
# the new topology type be?
_SERVER_TYPE_TO_TOPOLOGY_TYPE = {
SERVER_TYPE.Mongos: TOPOLOGY_TYPE.Sharded,
SERVER_TYPE.RSPrimary: TOPOLOGY_TYPE.ReplicaSetWithPrimary,
SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
}
def updated_topology_description(topology_description, server_description):
"""Return an updated copy of a TopologyDescription.
:Parameters:
- `topology_description`: the current TopologyDescription
- `server_description`: a new ServerDescription that resulted from
an ismaster call
Called after attempting (successfully or not) to call ismaster on the
server at server_description.address. Does not modify topology_description.
"""
address = server_description.address
# These values will be updated, if necessary, to form the new
# TopologyDescription.
topology_type = topology_description.topology_type
set_name = topology_description.replica_set_name
max_set_version = topology_description.max_set_version
max_election_id = topology_description.max_election_id
server_type = server_description.server_type
# Don't mutate the original dict of server descriptions; copy it.
sds = topology_description.server_descriptions()
# Replace this server's description with the new one.
sds[address] = server_description
if topology_type == TOPOLOGY_TYPE.Single:
# Set server type to Unknown if replica set name does not match.
if (set_name is not None and
set_name != server_description.replica_set_name):
error = ConfigurationError(
"client is configured to connect to a replica set named "
"'%s' but this node belongs to a set named '%s'" % (
set_name, server_description.replica_set_name))
sds[address] = server_description.to_unknown(error=error)
# Single type never changes.
return TopologyDescription(
TOPOLOGY_TYPE.Single,
sds,
set_name,
max_set_version,
max_election_id,
topology_description._topology_settings)
if topology_type == TOPOLOGY_TYPE.Unknown:
if server_type == SERVER_TYPE.Standalone:
if len(topology_description._topology_settings.seeds) == 1:
topology_type = TOPOLOGY_TYPE.Single
else:
# Remove standalone from Topology when given multiple seeds.
sds.pop(address)
elif server_type not in (SERVER_TYPE.Unknown, SERVER_TYPE.RSGhost):
topology_type = _SERVER_TYPE_TO_TOPOLOGY_TYPE[server_type]
if topology_type == TOPOLOGY_TYPE.Sharded:
if server_type not in (SERVER_TYPE.Mongos, SERVER_TYPE.Unknown):
sds.pop(address)
elif topology_type == TOPOLOGY_TYPE.ReplicaSetNoPrimary:
if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
sds.pop(address)
elif server_type == SERVER_TYPE.RSPrimary:
(topology_type,
set_name,
max_set_version,
max_election_id) = _update_rs_from_primary(sds,
set_name,
server_description,
max_set_version,
max_election_id)
elif server_type in (
SERVER_TYPE.RSSecondary,
SERVER_TYPE.RSArbiter,
SERVER_TYPE.RSOther):
topology_type, set_name = _update_rs_no_primary_from_member(
sds, set_name, server_description)
elif topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
sds.pop(address)
topology_type = _check_has_primary(sds)
elif server_type == SERVER_TYPE.RSPrimary:
(topology_type,
set_name,
max_set_version,
max_election_id) = _update_rs_from_primary(sds,
set_name,
server_description,
max_set_version,
max_election_id)
elif server_type in (
SERVER_TYPE.RSSecondary,
SERVER_TYPE.RSArbiter,
SERVER_TYPE.RSOther):
topology_type = _update_rs_with_primary_from_member(
sds, set_name, server_description)
else:
# Server type is Unknown or RSGhost: did we just lose the primary?
topology_type = _check_has_primary(sds)
# Return updated copy.
return TopologyDescription(topology_type,
sds,
set_name,
max_set_version,
max_election_id,
topology_description._topology_settings)
def _updated_topology_description_srv_polling(topology_description, seedlist):
"""Return an updated copy of a TopologyDescription.
:Parameters:
- `topology_description`: the current TopologyDescription
- `seedlist`: a list of new seeds new ServerDescription that resulted from
an ismaster call
"""
# Create a copy of the server descriptions.
sds = topology_description.server_descriptions()
# If seeds haven't changed, don't do anything.
if set(sds.keys()) == set(seedlist):
return topology_description
# Add SDs corresponding to servers recently added to the SRV record.
for address in seedlist:
if address not in sds:
sds[address] = ServerDescription(address)
# Remove SDs corresponding to servers no longer part of the SRV record.
for address in list(sds.keys()):
if address not in seedlist:
sds.pop(address)
return TopologyDescription(
topology_description.topology_type,
sds,
topology_description.replica_set_name,
topology_description.max_set_version,
topology_description.max_election_id,
topology_description._topology_settings)
def _update_rs_from_primary(
sds,
replica_set_name,
server_description,
max_set_version,
max_election_id):
"""Update topology description from a primary's ismaster response.
Pass in a dict of ServerDescriptions, current replica set name, the
ServerDescription we are processing, and the TopologyDescription's
max_set_version and max_election_id if any.
Returns (new topology type, new replica_set_name, new max_set_version,
new max_election_id).
"""
if replica_set_name is None:
replica_set_name = server_description.replica_set_name
elif replica_set_name != server_description.replica_set_name:
# We found a primary but it doesn't have the replica_set_name
# provided by the user.
sds.pop(server_description.address)
return (_check_has_primary(sds),
replica_set_name,
max_set_version,
max_election_id)
max_election_tuple = max_set_version, max_election_id
if None not in server_description.election_tuple:
if (None not in max_election_tuple and
max_election_tuple > server_description.election_tuple):
# Stale primary, set to type Unknown.
sds[server_description.address] = server_description.to_unknown()
return (_check_has_primary(sds),
replica_set_name,
max_set_version,
max_election_id)
max_election_id = server_description.election_id
if (server_description.set_version is not None and
(max_set_version is None or
server_description.set_version > max_set_version)):
max_set_version = server_description.set_version
# We've heard from the primary. Is it the same primary as before?
for server in sds.values():
if (server.server_type is SERVER_TYPE.RSPrimary
and server.address != server_description.address):
# Reset old primary's type to Unknown.
sds[server.address] = server.to_unknown()
# There can be only one prior primary.
break
# Discover new hosts from this primary's response.
for new_address in server_description.all_hosts:
if new_address not in sds:
sds[new_address] = ServerDescription(new_address)
# Remove hosts not in the response.
for addr in set(sds) - server_description.all_hosts:
sds.pop(addr)
# If the host list differs from the seed list, we may not have a primary
# after all.
return (_check_has_primary(sds),
replica_set_name,
max_set_version,
max_election_id)
def _update_rs_with_primary_from_member(
sds,
replica_set_name,
server_description):
"""RS with known primary. Process a response from a non-primary.
Pass in a dict of ServerDescriptions, current replica set name, and the
ServerDescription we are processing.
Returns new topology type.
"""
assert replica_set_name is not None
if replica_set_name != server_description.replica_set_name:
sds.pop(server_description.address)
elif (server_description.me and
server_description.address != server_description.me):
sds.pop(server_description.address)
# Had this member been the primary?
return _check_has_primary(sds)
def _update_rs_no_primary_from_member(
sds,
replica_set_name,
server_description):
"""RS without known primary. Update from a non-primary's response.
Pass in a dict of ServerDescriptions, current replica set name, and the
ServerDescription we are processing.
Returns (new topology type, new replica_set_name).
"""
topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
if replica_set_name is None:
replica_set_name = server_description.replica_set_name
elif replica_set_name != server_description.replica_set_name:
sds.pop(server_description.address)
return topology_type, replica_set_name
# This isn't the primary's response, so don't remove any servers
# it doesn't report. Only add new servers.
for address in server_description.all_hosts:
if address not in sds:
sds[address] = ServerDescription(address)
if (server_description.me and
server_description.address != server_description.me):
sds.pop(server_description.address)
return topology_type, replica_set_name
def _check_has_primary(sds):
"""Current topology type is ReplicaSetWithPrimary. Is primary still known?
Pass in a dict of ServerDescriptions.
Returns new topology type.
"""
for s in sds.values():
if s.server_type == SERVER_TYPE.RSPrimary:
return TOPOLOGY_TYPE.ReplicaSetWithPrimary
else:
return TOPOLOGY_TYPE.ReplicaSetNoPrimary