source: vendor/current/lib/dnspython/dns/dnssec.py@ 740

Last change on this file since 740 was 740, checked in by Silvan Scherrer, 12 years ago

Samba Server: update vendor to 3.6.0

File size: 12.1 KB
Line 
1# Copyright (C) 2003-2007, 2009 Nominum, Inc.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose with or without fee is hereby granted,
5# provided that the above copyright notice and this permission notice
6# appear in all copies.
7#
8# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
9# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
11# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
14# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16"""Common DNSSEC-related functions and constants."""
17
18import cStringIO
19import struct
20import time
21
22import dns.exception
23import dns.hash
24import dns.name
25import dns.node
26import dns.rdataset
27import dns.rdata
28import dns.rdatatype
29import dns.rdataclass
30
31class UnsupportedAlgorithm(dns.exception.DNSException):
32 """Raised if an algorithm is not supported."""
33 pass
34
35class ValidationFailure(dns.exception.DNSException):
36 """The DNSSEC signature is invalid."""
37 pass
38
39RSAMD5 = 1
40DH = 2
41DSA = 3
42ECC = 4
43RSASHA1 = 5
44DSANSEC3SHA1 = 6
45RSASHA1NSEC3SHA1 = 7
46RSASHA256 = 8
47RSASHA512 = 10
48INDIRECT = 252
49PRIVATEDNS = 253
50PRIVATEOID = 254
51
52_algorithm_by_text = {
53 'RSAMD5' : RSAMD5,
54 'DH' : DH,
55 'DSA' : DSA,
56 'ECC' : ECC,
57 'RSASHA1' : RSASHA1,
58 'DSANSEC3SHA1' : DSANSEC3SHA1,
59 'RSASHA1NSEC3SHA1' : RSASHA1NSEC3SHA1,
60 'RSASHA256' : RSASHA256,
61 'RSASHA512' : RSASHA512,
62 'INDIRECT' : INDIRECT,
63 'PRIVATEDNS' : PRIVATEDNS,
64 'PRIVATEOID' : PRIVATEOID,
65 }
66
67# We construct the inverse mapping programmatically to ensure that we
68# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
69# would cause the mapping not to be true inverse.
70
71_algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.iteritems()])
72
73def algorithm_from_text(text):
74 """Convert text into a DNSSEC algorithm value
75 @rtype: int"""
76
77 value = _algorithm_by_text.get(text.upper())
78 if value is None:
79 value = int(text)
80 return value
81
82def algorithm_to_text(value):
83 """Convert a DNSSEC algorithm value to text
84 @rtype: string"""
85
86 text = _algorithm_by_value.get(value)
87 if text is None:
88 text = str(value)
89 return text
90
91def _to_rdata(record, origin):
92 s = cStringIO.StringIO()
93 record.to_wire(s, origin=origin)
94 return s.getvalue()
95
96def key_id(key, origin=None):
97 rdata = _to_rdata(key, origin)
98 if key.algorithm == RSAMD5:
99 return (ord(rdata[-3]) << 8) + ord(rdata[-2])
100 else:
101 total = 0
102 for i in range(len(rdata) / 2):
103 total += (ord(rdata[2 * i]) << 8) + ord(rdata[2 * i + 1])
104 if len(rdata) % 2 != 0:
105 total += ord(rdata[len(rdata) - 1]) << 8
106 total += ((total >> 16) & 0xffff);
107 return total & 0xffff
108
109def make_ds(name, key, algorithm, origin=None):
110 if algorithm.upper() == 'SHA1':
111 dsalg = 1
112 hash = dns.hash.get('SHA1')()
113 elif algorithm.upper() == 'SHA256':
114 dsalg = 2
115 hash = dns.hash.get('SHA256')()
116 else:
117 raise UnsupportedAlgorithm, 'unsupported algorithm "%s"' % algorithm
118
119 if isinstance(name, (str, unicode)):
120 name = dns.name.from_text(name, origin)
121 hash.update(name.canonicalize().to_wire())
122 hash.update(_to_rdata(key, origin))
123 digest = hash.digest()
124
125 dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
126 return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
127 len(dsrdata))
128
129def _find_key(keys, rrsig):
130 value = keys.get(rrsig.signer)
131 if value is None:
132 return None
133 if isinstance(value, dns.node.Node):
134 try:
135 rdataset = node.find_rdataset(dns.rdataclass.IN,
136 dns.rdatatype.DNSKEY)
137 except KeyError:
138 return None
139 else:
140 rdataset = value
141 for rdata in rdataset:
142 if rdata.algorithm == rrsig.algorithm and \
143 key_id(rdata) == rrsig.key_tag:
144 return rdata
145 return None
146
147def _is_rsa(algorithm):
148 return algorithm in (RSAMD5, RSASHA1,
149 RSASHA1NSEC3SHA1, RSASHA256,
150 RSASHA512)
151
152def _is_dsa(algorithm):
153 return algorithm in (DSA, DSANSEC3SHA1)
154
155def _is_md5(algorithm):
156 return algorithm == RSAMD5
157
158def _is_sha1(algorithm):
159 return algorithm in (DSA, RSASHA1,
160 DSANSEC3SHA1, RSASHA1NSEC3SHA1)
161
162def _is_sha256(algorithm):
163 return algorithm == RSASHA256
164
165def _is_sha512(algorithm):
166 return algorithm == RSASHA512
167
168def _make_hash(algorithm):
169 if _is_md5(algorithm):
170 return dns.hash.get('MD5')()
171 if _is_sha1(algorithm):
172 return dns.hash.get('SHA1')()
173 if _is_sha256(algorithm):
174 return dns.hash.get('SHA256')()
175 if _is_sha512(algorithm):
176 return dns.hash.get('SHA512')()
177 raise ValidationFailure, 'unknown hash for algorithm %u' % algorithm
178
179def _make_algorithm_id(algorithm):
180 if _is_md5(algorithm):
181 oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05]
182 elif _is_sha1(algorithm):
183 oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a]
184 elif _is_sha256(algorithm):
185 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01]
186 elif _is_sha512(algorithm):
187 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
188 else:
189 raise ValidationFailure, 'unknown algorithm %u' % algorithm
190 olen = len(oid)
191 dlen = _make_hash(algorithm).digest_size
192 idbytes = [0x30] + [8 + olen + dlen] + \
193 [0x30, olen + 4] + [0x06, olen] + oid + \
194 [0x05, 0x00] + [0x04, dlen]
195 return ''.join(map(chr, idbytes))
196
197def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
198 """Validate an RRset against a single signature rdata
199
200 The owner name of the rrsig is assumed to be the same as the owner name
201 of the rrset.
202
203 @param rrset: The RRset to validate
204 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
205 tuple
206 @param rrsig: The signature rdata
207 @type rrsig: dns.rrset.Rdata
208 @param keys: The key dictionary.
209 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
210 @param origin: The origin to use for relative names
211 @type origin: dns.name.Name or None
212 @param now: The time to use when validating the signatures. The default
213 is the current time.
214 @type now: int
215 """
216
217 if isinstance(origin, (str, unicode)):
218 origin = dns.name.from_text(origin, dns.name.root)
219
220 key = _find_key(keys, rrsig)
221 if not key:
222 raise ValidationFailure, 'unknown key'
223
224 # For convenience, allow the rrset to be specified as a (name, rdataset)
225 # tuple as well as a proper rrset
226 if isinstance(rrset, tuple):
227 rrname = rrset[0]
228 rdataset = rrset[1]
229 else:
230 rrname = rrset.name
231 rdataset = rrset
232
233 if now is None:
234 now = time.time()
235 if rrsig.expiration < now:
236 raise ValidationFailure, 'expired'
237 if rrsig.inception > now:
238 raise ValidationFailure, 'not yet valid'
239
240 hash = _make_hash(rrsig.algorithm)
241
242 if _is_rsa(rrsig.algorithm):
243 keyptr = key.key
244 (bytes,) = struct.unpack('!B', keyptr[0:1])
245 keyptr = keyptr[1:]
246 if bytes == 0:
247 (bytes,) = struct.unpack('!H', keyptr[0:2])
248 keyptr = keyptr[2:]
249 rsa_e = keyptr[0:bytes]
250 rsa_n = keyptr[bytes:]
251 keylen = len(rsa_n) * 8
252 pubkey = Crypto.PublicKey.RSA.construct(
253 (Crypto.Util.number.bytes_to_long(rsa_n),
254 Crypto.Util.number.bytes_to_long(rsa_e)))
255 sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
256 elif _is_dsa(rrsig.algorithm):
257 keyptr = key.key
258 (t,) = struct.unpack('!B', keyptr[0:1])
259 keyptr = keyptr[1:]
260 octets = 64 + t * 8
261 dsa_q = keyptr[0:20]
262 keyptr = keyptr[20:]
263 dsa_p = keyptr[0:octets]
264 keyptr = keyptr[octets:]
265 dsa_g = keyptr[0:octets]
266 keyptr = keyptr[octets:]
267 dsa_y = keyptr[0:octets]
268 pubkey = Crypto.PublicKey.DSA.construct(
269 (Crypto.Util.number.bytes_to_long(dsa_y),
270 Crypto.Util.number.bytes_to_long(dsa_g),
271 Crypto.Util.number.bytes_to_long(dsa_p),
272 Crypto.Util.number.bytes_to_long(dsa_q)))
273 (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
274 sig = (Crypto.Util.number.bytes_to_long(dsa_r),
275 Crypto.Util.number.bytes_to_long(dsa_s))
276 else:
277 raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
278
279 hash.update(_to_rdata(rrsig, origin)[:18])
280 hash.update(rrsig.signer.to_digestable(origin))
281
282 if rrsig.labels < len(rrname) - 1:
283 suffix = rrname.split(rrsig.labels + 1)[1]
284 rrname = dns.name.from_text('*', suffix)
285 rrnamebuf = rrname.to_digestable(origin)
286 rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
287 rrsig.original_ttl)
288 rrlist = sorted(rdataset);
289 for rr in rrlist:
290 hash.update(rrnamebuf)
291 hash.update(rrfixed)
292 rrdata = rr.to_digestable(origin)
293 rrlen = struct.pack('!H', len(rrdata))
294 hash.update(rrlen)
295 hash.update(rrdata)
296
297 digest = hash.digest()
298
299 if _is_rsa(rrsig.algorithm):
300 # PKCS1 algorithm identifier goop
301 digest = _make_algorithm_id(rrsig.algorithm) + digest
302 padlen = keylen / 8 - len(digest) - 3
303 digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
304 elif _is_dsa(rrsig.algorithm):
305 pass
306 else:
307 # Raise here for code clarity; this won't actually ever happen
308 # since if the algorithm is really unknown we'd already have
309 # raised an exception above
310 raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
311
312 if not pubkey.verify(digest, sig):
313 raise ValidationFailure, 'verify failure'
314
315def _validate(rrset, rrsigset, keys, origin=None, now=None):
316 """Validate an RRset
317
318 @param rrset: The RRset to validate
319 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
320 tuple
321 @param rrsigset: The signature RRset
322 @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
323 tuple
324 @param keys: The key dictionary.
325 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
326 @param origin: The origin to use for relative names
327 @type origin: dns.name.Name or None
328 @param now: The time to use when validating the signatures. The default
329 is the current time.
330 @type now: int
331 """
332
333 if isinstance(origin, (str, unicode)):
334 origin = dns.name.from_text(origin, dns.name.root)
335
336 if isinstance(rrset, tuple):
337 rrname = rrset[0]
338 else:
339 rrname = rrset.name
340
341 if isinstance(rrsigset, tuple):
342 rrsigname = rrsigset[0]
343 rrsigrdataset = rrsigset[1]
344 else:
345 rrsigname = rrsigset.name
346 rrsigrdataset = rrsigset
347
348 rrname = rrname.choose_relativity(origin)
349 rrsigname = rrname.choose_relativity(origin)
350 if rrname != rrsigname:
351 raise ValidationFailure, "owner names do not match"
352
353 for rrsig in rrsigrdataset:
354 try:
355 _validate_rrsig(rrset, rrsig, keys, origin, now)
356 return
357 except ValidationFailure, e:
358 pass
359 raise ValidationFailure, "no RRSIGs validated"
360
361def _need_pycrypto(*args, **kwargs):
362 raise NotImplementedError, "DNSSEC validation requires pycrypto"
363
364try:
365 import Crypto.PublicKey.RSA
366 import Crypto.PublicKey.DSA
367 import Crypto.Util.number
368 validate = _validate
369 validate_rrsig = _validate_rrsig
370except ImportError:
371 validate = _need_pycrypto
372 validate_rrsig = _need_pycrypto
Note: See TracBrowser for help on using the repository browser.