source: trunk/server/lib/tdb/common/lock.c@ 878

Last change on this file since 878 was 878, checked in by Silvan Scherrer, 10 years ago

Samba server 3.6: fixed some tdb problems

File size: 24.1 KB
Line 
1 /*
2 Unix SMB/CIFS implementation.
3
4 trivial database library
5
6 Copyright (C) Andrew Tridgell 1999-2005
7 Copyright (C) Paul `Rusty' Russell 2000
8 Copyright (C) Jeremy Allison 2000-2003
9
10 ** NOTE! The following LGPL license applies to the tdb
11 ** library. This does NOT imply that all of Samba is released
12 ** under the LGPL
13
14 This library is free software; you can redistribute it and/or
15 modify it under the terms of the GNU Lesser General Public
16 License as published by the Free Software Foundation; either
17 version 3 of the License, or (at your option) any later version.
18
19 This library is distributed in the hope that it will be useful,
20 but WITHOUT ANY WARRANTY; without even the implied warranty of
21 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22 Lesser General Public License for more details.
23
24 You should have received a copy of the GNU Lesser General Public
25 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26*/
27
28#include "tdb_private.h"
29
30_PUBLIC_ void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)
31{
32 tdb->interrupt_sig_ptr = ptr;
33}
34
35static int fcntl_lock(struct tdb_context *tdb,
36 int rw, off_t off, off_t len, bool waitflag)
37{
38 struct flock fl;
39
40 fl.l_type = rw;
41 fl.l_whence = SEEK_SET;
42 fl.l_start = off;
43 fl.l_len = len;
44 fl.l_pid = 0;
45
46#ifdef __OS2__
47 int rc = 0;
48 int lockFile = 0;
49
50 if (off == ACTIVE_LOCK || off == OPEN_LOCK || off == TRANSACTION_LOCK)
51 lockFile = tdb->hActiveLock;
52 else
53 lockFile = tdb->fd;
54
55 int cmd = 0;
56 if (waitflag)
57 cmd = F_SETLKW;
58 else
59 cmd = F_SETLK;
60
61 rc = fcntl(lockFile, cmd, &fl);
62 // if the first lock doesn't work and it's a complete lock,
63 // we split it in 2 parts. first hash size*4 and then the rest
64 if (rc != 0 && off == FREELIST_TOP && len == 0) {
65 fl.l_len = tdb->header.hash_size * 4;
66 rc = fcntl(lockFile, cmd, &fl);
67 if (rc == 0) {
68 fl.l_start = off + tdb->header.hash_size * 4;
69 fl.l_len = 0;
70 rc = fcntl(lockFile, cmd, &fl);
71 }
72 }
73
74 TDB_LOG((tdb, TDB_DEBUG_TRACE,"fcntl_lock: (fd=%d) offset=%lld rw_type=%d len=%lld waitflag=%d (rc=%d) pid=%d\n",
75 lockFile, off, rw, len, waitflag, rc, getpid()));
76
77 return rc;
78#else
79 if (waitflag)
80 return fcntl(tdb->fd, F_SETLKW, &fl);
81 else
82 return fcntl(tdb->fd, F_SETLK, &fl);
83#endif
84}
85
86static int fcntl_unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
87{
88 struct flock fl;
89#if 0 /* Check they matched up locks and unlocks correctly. */
90 char line[80];
91 FILE *locks;
92 bool found = false;
93
94 locks = fopen("/proc/locks", "r");
95
96 while (fgets(line, 80, locks)) {
97 char *p;
98 int type, start, l;
99
100 /* eg. 1: FLOCK ADVISORY WRITE 2440 08:01:2180826 0 EOF */
101 p = strchr(line, ':') + 1;
102 if (strncmp(p, " POSIX ADVISORY ", strlen(" POSIX ADVISORY ")))
103 continue;
104 p += strlen(" FLOCK ADVISORY ");
105 if (strncmp(p, "READ ", strlen("READ ")) == 0)
106 type = F_RDLCK;
107 else if (strncmp(p, "WRITE ", strlen("WRITE ")) == 0)
108 type = F_WRLCK;
109 else
110 abort();
111 p += 6;
112 if (atoi(p) != getpid())
113 continue;
114 p = strchr(strchr(p, ' ') + 1, ' ') + 1;
115 start = atoi(p);
116 p = strchr(p, ' ') + 1;
117 if (strncmp(p, "EOF", 3) == 0)
118 l = 0;
119 else
120 l = atoi(p) - start + 1;
121
122 if (off == start) {
123 if (len != l) {
124 fprintf(stderr, "Len %u should be %u: %s",
125 (int)len, l, line);
126 abort();
127 }
128 if (type != rw) {
129 fprintf(stderr, "Type %s wrong: %s",
130 rw == F_RDLCK ? "READ" : "WRITE", line);
131 abort();
132 }
133 found = true;
134 break;
135 }
136 }
137
138 if (!found) {
139 fprintf(stderr, "Unlock on %u@%u not found!\n",
140 (int)off, (int)len);
141 abort();
142 }
143
144 fclose(locks);
145#endif
146
147 fl.l_type = F_UNLCK;
148 fl.l_whence = SEEK_SET;
149 fl.l_start = off;
150 fl.l_len = len;
151 fl.l_pid = 0;
152
153#ifdef __OS2__
154 int rc = 0;
155 int lockFile = 0;
156 if (off == ACTIVE_LOCK || off == OPEN_LOCK || off == TRANSACTION_LOCK)
157 lockFile = tdb->hActiveLock;
158 else
159 lockFile = tdb->fd;
160
161 rc = fcntl(lockFile, F_SETLKW, &fl);
162 // if the first unlock doesn't work and it's a complete unlock,
163 // we split it in 2 parts. first hash size*4 and then the rest
164 // as it was locked that way as well. and it seems fcntl() doesn't care
165 if (rc != 0 && off == FREELIST_TOP && len == 0) {
166 fl.l_len = tdb->header.hash_size * 4;
167 rc = fcntl(lockFile, F_SETLKW, &fl);
168 if (rc == 0) {
169 fl.l_start = off + tdb->header.hash_size * 4;
170 fl.l_len = 0;
171 rc = fcntl(lockFile, F_SETLKW, &fl);
172 }
173 }
174
175 TDB_LOG((tdb, TDB_DEBUG_TRACE,"fcntl_unlock: (fd=%d) offset=%lld rw_type=%d len=%lld (rc=%d) pid=%d\n",
176 lockFile, off, rw, len, rc, getpid()));
177
178 return rc;
179#else
180 return fcntl(tdb->fd, F_SETLKW, &fl);
181#endif
182}
183
184/* list -1 is the alloc list, otherwise a hash chain. */
185static tdb_off_t lock_offset(int list)
186{
187 return FREELIST_TOP + 4*list;
188}
189
190/* a byte range locking function - return 0 on success
191 this functions locks/unlocks 1 byte at the specified offset.
192
193 On error, errno is also set so that errors are passed back properly
194 through tdb_open().
195
196 note that a len of zero means lock to end of file
197*/
198int tdb_brlock(struct tdb_context *tdb,
199 int rw_type, tdb_off_t offset, size_t len,
200 enum tdb_lock_flags flags)
201{
202 int ret;
203
204 if (tdb->flags & TDB_NOLOCK) {
205 return 0;
206 }
207
208 if (flags & TDB_LOCK_MARK_ONLY) {
209 return 0;
210 }
211
212 if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
213 tdb->ecode = TDB_ERR_RDONLY;
214 return -1;
215 }
216
217 do {
218 ret = fcntl_lock(tdb, rw_type, offset, len,
219 flags & TDB_LOCK_WAIT);
220 /* Check for a sigalarm break. */
221 if (ret == -1 && errno == EINTR &&
222 tdb->interrupt_sig_ptr &&
223 *tdb->interrupt_sig_ptr) {
224 break;
225 }
226 } while (ret == -1 && errno == EINTR);
227
228 if (ret == -1) {
229 tdb->ecode = TDB_ERR_LOCK;
230 /* Generic lock error. errno set by fcntl.
231 * EAGAIN is an expected return from non-blocking
232 * locks. */
233 if (!(flags & TDB_LOCK_PROBE) && errno != EAGAIN) {
234 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d flags=%d len=%d\n",
235 tdb->fd, offset, rw_type, flags, (int)len));
236 }
237 return -1;
238 }
239 return 0;
240}
241
242int tdb_brunlock(struct tdb_context *tdb,
243 int rw_type, tdb_off_t offset, size_t len)
244{
245 int ret;
246
247 if (tdb->flags & TDB_NOLOCK) {
248 return 0;
249 }
250
251 do {
252 ret = fcntl_unlock(tdb, rw_type, offset, len);
253 } while (ret == -1 && errno == EINTR);
254
255 if (ret == -1) {
256 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brunlock failed (fd=%d) at offset %d rw_type=%d len=%d\n",
257 tdb->fd, offset, rw_type, (int)len));
258 }
259 return ret;
260}
261
262/*
263 upgrade a read lock to a write lock. This needs to be handled in a
264 special way as some OSes (such as solaris) have too conservative
265 deadlock detection and claim a deadlock when progress can be
266 made. For those OSes we may loop for a while.
267*/
268int tdb_allrecord_upgrade(struct tdb_context *tdb)
269{
270 int count = 1000;
271
272 if (tdb->allrecord_lock.count != 1) {
273 TDB_LOG((tdb, TDB_DEBUG_ERROR,
274 "tdb_allrecord_upgrade failed: count %u too high\n",
275 tdb->allrecord_lock.count));
276 return -1;
277 }
278
279 if (tdb->allrecord_lock.off != 1) {
280 TDB_LOG((tdb, TDB_DEBUG_ERROR,
281 "tdb_allrecord_upgrade failed: already upgraded?\n"));
282 return -1;
283 }
284
285 while (count--) {
286 struct timeval tv;
287#ifdef __OS2__
288 // we need to remove locks, as upgrade doesn't work
289 tdb_brunlock(tdb, F_RDLCK, FREELIST_TOP, 0);
290#endif
291 if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0,
292 TDB_LOCK_WAIT|TDB_LOCK_PROBE) == 0) {
293 tdb->allrecord_lock.ltype = F_WRLCK;
294 tdb->allrecord_lock.off = 0;
295 return 0;
296 }
297 if (errno != EDEADLK) {
298 break;
299 }
300 /* sleep for as short a time as we can - more portable than usleep() */
301 tv.tv_sec = 0;
302 tv.tv_usec = 1;
303 select(0, NULL, NULL, NULL, &tv);
304 }
305 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_allrecord_upgrade failed\n"));
306 return -1;
307}
308
309static struct tdb_lock_type *find_nestlock(struct tdb_context *tdb,
310 tdb_off_t offset)
311{
312 unsigned int i;
313
314 for (i=0; i<tdb->num_lockrecs; i++) {
315 if (tdb->lockrecs[i].off == offset) {
316 return &tdb->lockrecs[i];
317 }
318 }
319 return NULL;
320}
321
322/* lock an offset in the database. */
323int tdb_nest_lock(struct tdb_context *tdb, uint32_t offset, int ltype,
324 enum tdb_lock_flags flags)
325{
326 struct tdb_lock_type *new_lck;
327
328 if (offset >= lock_offset(tdb->header.hash_size)) {
329 tdb->ecode = TDB_ERR_LOCK;
330 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid offset %u for ltype=%d\n",
331 offset, ltype));
332 return -1;
333 }
334 if (tdb->flags & TDB_NOLOCK)
335 return 0;
336
337 new_lck = find_nestlock(tdb, offset);
338 if (new_lck) {
339 /*
340 * Just increment the in-memory struct, posix locks
341 * don't stack.
342 */
343 new_lck->count++;
344 return 0;
345 }
346
347 new_lck = (struct tdb_lock_type *)realloc(
348 tdb->lockrecs,
349 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
350 if (new_lck == NULL) {
351 errno = ENOMEM;
352 return -1;
353 }
354 tdb->lockrecs = new_lck;
355
356 /* Since fcntl locks don't nest, we do a lock for the first one,
357 and simply bump the count for future ones */
358 if (tdb_brlock(tdb, ltype, offset, 1, flags)) {
359 return -1;
360 }
361
362 tdb->lockrecs[tdb->num_lockrecs].off = offset;
363 tdb->lockrecs[tdb->num_lockrecs].count = 1;
364 tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
365 tdb->num_lockrecs++;
366
367 return 0;
368}
369
370static int tdb_lock_and_recover(struct tdb_context *tdb)
371{
372 int ret;
373
374 /* We need to match locking order in transaction commit. */
375 if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT)) {
376 return -1;
377 }
378
379 if (tdb_brlock(tdb, F_WRLCK, OPEN_LOCK, 1, TDB_LOCK_WAIT)) {
380 tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
381 return -1;
382 }
383
384 ret = tdb_transaction_recover(tdb);
385
386 tdb_brunlock(tdb, F_WRLCK, OPEN_LOCK, 1);
387 tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
388
389 return ret;
390}
391
392static bool have_data_locks(const struct tdb_context *tdb)
393{
394 unsigned int i;
395
396 for (i = 0; i < tdb->num_lockrecs; i++) {
397 if (tdb->lockrecs[i].off >= lock_offset(-1))
398 return true;
399 }
400 return false;
401}
402
403static int tdb_lock_list(struct tdb_context *tdb, int list, int ltype,
404 enum tdb_lock_flags waitflag)
405{
406 int ret;
407 bool check = false;
408
409 /* a allrecord lock allows us to avoid per chain locks */
410 if (tdb->allrecord_lock.count &&
411 (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
412 return 0;
413 }
414
415 if (tdb->allrecord_lock.count) {
416 tdb->ecode = TDB_ERR_LOCK;
417 ret = -1;
418 } else {
419 /* Only check when we grab first data lock. */
420 check = !have_data_locks(tdb);
421 ret = tdb_nest_lock(tdb, lock_offset(list), ltype, waitflag);
422
423 if (ret == 0 && check && tdb_needs_recovery(tdb)) {
424 tdb_nest_unlock(tdb, lock_offset(list), ltype, false);
425
426 if (tdb_lock_and_recover(tdb) == -1) {
427 return -1;
428 }
429 return tdb_lock_list(tdb, list, ltype, waitflag);
430 }
431 }
432 return ret;
433}
434
435/* lock a list in the database. list -1 is the alloc list */
436int tdb_lock(struct tdb_context *tdb, int list, int ltype)
437{
438 int ret;
439
440 ret = tdb_lock_list(tdb, list, ltype, TDB_LOCK_WAIT);
441 if (ret) {
442 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
443 "ltype=%d (%s)\n", list, ltype, strerror(errno)));
444 }
445 return ret;
446}
447
448/* lock a list in the database. list -1 is the alloc list. non-blocking lock */
449int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
450{
451 return tdb_lock_list(tdb, list, ltype, TDB_LOCK_NOWAIT);
452}
453
454
455int tdb_nest_unlock(struct tdb_context *tdb, uint32_t offset, int ltype,
456 bool mark_lock)
457{
458 int ret = -1;
459 struct tdb_lock_type *lck;
460
461 if (tdb->flags & TDB_NOLOCK)
462 return 0;
463
464 /* Sanity checks */
465 if (offset >= lock_offset(tdb->header.hash_size)) {
466 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: offset %u invalid (%d)\n", offset, tdb->header.hash_size));
467 return ret;
468 }
469
470 lck = find_nestlock(tdb, offset);
471 if ((lck == NULL) || (lck->count == 0)) {
472 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
473 return -1;
474 }
475
476 if (lck->count > 1) {
477 lck->count--;
478 return 0;
479 }
480
481 /*
482 * This lock has count==1 left, so we need to unlock it in the
483 * kernel. We don't bother with decrementing the in-memory array
484 * element, we're about to overwrite it with the last array element
485 * anyway.
486 */
487
488 if (mark_lock) {
489 ret = 0;
490 } else {
491 ret = tdb_brunlock(tdb, ltype, offset, 1);
492 }
493
494 /*
495 * Shrink the array by overwriting the element just unlocked with the
496 * last array element.
497 */
498 *lck = tdb->lockrecs[--tdb->num_lockrecs];
499
500 /*
501 * We don't bother with realloc when the array shrinks, but if we have
502 * a completely idle tdb we should get rid of the locked array.
503 */
504
505 if (tdb->num_lockrecs == 0) {
506 SAFE_FREE(tdb->lockrecs);
507 }
508
509 if (ret)
510 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
511 return ret;
512}
513
514int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
515{
516 /* a global lock allows us to avoid per chain locks */
517 if (tdb->allrecord_lock.count &&
518 (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
519 return 0;
520 }
521
522 if (tdb->allrecord_lock.count) {
523 tdb->ecode = TDB_ERR_LOCK;
524 return -1;
525 }
526
527 return tdb_nest_unlock(tdb, lock_offset(list), ltype, false);
528}
529
530/*
531 get the transaction lock
532 */
533int tdb_transaction_lock(struct tdb_context *tdb, int ltype,
534 enum tdb_lock_flags lockflags)
535{
536 return tdb_nest_lock(tdb, TRANSACTION_LOCK, ltype, lockflags);
537}
538
539/*
540 release the transaction lock
541 */
542int tdb_transaction_unlock(struct tdb_context *tdb, int ltype)
543{
544 return tdb_nest_unlock(tdb, TRANSACTION_LOCK, ltype, false);
545}
546
547/* Returns 0 if all done, -1 if error, 1 if ok. */
548static int tdb_allrecord_check(struct tdb_context *tdb, int ltype,
549 enum tdb_lock_flags flags, bool upgradable)
550{
551 /* There are no locks on read-only dbs */
552 if (tdb->read_only || tdb->traverse_read) {
553 tdb->ecode = TDB_ERR_LOCK;
554 return -1;
555 }
556
557 if (tdb->allrecord_lock.count && tdb->allrecord_lock.ltype == ltype) {
558 tdb->allrecord_lock.count++;
559 return 0;
560 }
561
562 if (tdb->allrecord_lock.count) {
563 /* a global lock of a different type exists */
564 tdb->ecode = TDB_ERR_LOCK;
565 return -1;
566 }
567
568 if (tdb_have_extra_locks(tdb)) {
569 /* can't combine global and chain locks */
570 tdb->ecode = TDB_ERR_LOCK;
571 return -1;
572 }
573
574 if (upgradable && ltype != F_RDLCK) {
575 /* tdb error: you can't upgrade a write lock! */
576 tdb->ecode = TDB_ERR_LOCK;
577 return -1;
578 }
579 return 1;
580}
581
582/* We only need to lock individual bytes, but Linux merges consecutive locks
583 * so we lock in contiguous ranges. */
584static int tdb_chainlock_gradual(struct tdb_context *tdb,
585 int ltype, enum tdb_lock_flags flags,
586 size_t off, size_t len)
587{
588 int ret;
589 enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
590
591 if (len <= 4) {
592 /* Single record. Just do blocking lock. */
593 return tdb_brlock(tdb, ltype, off, len, flags);
594 }
595
596 /* First we try non-blocking. */
597 ret = tdb_brlock(tdb, ltype, off, len, nb_flags);
598 if (ret == 0) {
599 return 0;
600 }
601
602 /* Try locking first half, then second. */
603 ret = tdb_chainlock_gradual(tdb, ltype, flags, off, len / 2);
604 if (ret == -1)
605 return -1;
606
607 ret = tdb_chainlock_gradual(tdb, ltype, flags,
608 off + len / 2, len - len / 2);
609 if (ret == -1) {
610 tdb_brunlock(tdb, ltype, off, len / 2);
611 return -1;
612 }
613 return 0;
614}
615
616/* lock/unlock entire database. It can only be upgradable if you have some
617 * other way of guaranteeing exclusivity (ie. transaction write lock).
618 * We do the locking gradually to avoid being starved by smaller locks. */
619int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
620 enum tdb_lock_flags flags, bool upgradable)
621{
622 switch (tdb_allrecord_check(tdb, ltype, flags, upgradable)) {
623 case -1:
624 return -1;
625 case 0:
626 return 0;
627 }
628
629 /* We cover two kinds of locks:
630 * 1) Normal chain locks. Taken for almost all operations.
631 * 3) Individual records locks. Taken after normal or free
632 * chain locks.
633 *
634 * It is (1) which cause the starvation problem, so we're only
635 * gradual for that. */
636 if (tdb_chainlock_gradual(tdb, ltype, flags, FREELIST_TOP,
637 tdb->header.hash_size * 4) == -1) {
638 return -1;
639 }
640
641 /* Grab individual record locks. */
642 if (tdb_brlock(tdb, ltype, lock_offset(tdb->header.hash_size), 0,
643 flags) == -1) {
644 tdb_brunlock(tdb, ltype, FREELIST_TOP,
645 tdb->header.hash_size * 4);
646 return -1;
647 }
648
649 tdb->allrecord_lock.count = 1;
650 /* If it's upgradable, it's actually exclusive so we can treat
651 * it as a write lock. */
652 tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
653 tdb->allrecord_lock.off = upgradable;
654
655 if (tdb_needs_recovery(tdb)) {
656 bool mark = flags & TDB_LOCK_MARK_ONLY;
657 tdb_allrecord_unlock(tdb, ltype, mark);
658 if (mark) {
659 tdb->ecode = TDB_ERR_LOCK;
660 TDB_LOG((tdb, TDB_DEBUG_ERROR,
661 "tdb_lockall_mark cannot do recovery\n"));
662 return -1;
663 }
664 if (tdb_lock_and_recover(tdb) == -1) {
665 return -1;
666 }
667 return tdb_allrecord_lock(tdb, ltype, flags, upgradable);
668 }
669
670 return 0;
671}
672
673
674
675/* unlock entire db */
676int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype, bool mark_lock)
677{
678 /* There are no locks on read-only dbs */
679 if (tdb->read_only || tdb->traverse_read) {
680 tdb->ecode = TDB_ERR_LOCK;
681 return -1;
682 }
683
684 if (tdb->allrecord_lock.count == 0) {
685 tdb->ecode = TDB_ERR_LOCK;
686 return -1;
687 }
688
689 /* Upgradable locks are marked as write locks. */
690 if (tdb->allrecord_lock.ltype != ltype
691 && (!tdb->allrecord_lock.off || ltype != F_RDLCK)) {
692 tdb->ecode = TDB_ERR_LOCK;
693 return -1;
694 }
695
696 if (tdb->allrecord_lock.count > 1) {
697 tdb->allrecord_lock.count--;
698 return 0;
699 }
700
701 if (!mark_lock && tdb_brunlock(tdb, ltype, FREELIST_TOP, 0)) {
702 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
703 return -1;
704 }
705
706 tdb->allrecord_lock.count = 0;
707 tdb->allrecord_lock.ltype = 0;
708
709 return 0;
710}
711
712/* lock entire database with write lock */
713_PUBLIC_ int tdb_lockall(struct tdb_context *tdb)
714{
715 tdb_trace(tdb, "tdb_lockall");
716 return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false);
717}
718
719/* lock entire database with write lock - mark only */
720_PUBLIC_ int tdb_lockall_mark(struct tdb_context *tdb)
721{
722 tdb_trace(tdb, "tdb_lockall_mark");
723 return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_MARK_ONLY, false);
724}
725
726/* unlock entire database with write lock - unmark only */
727_PUBLIC_ int tdb_lockall_unmark(struct tdb_context *tdb)
728{
729 tdb_trace(tdb, "tdb_lockall_unmark");
730 return tdb_allrecord_unlock(tdb, F_WRLCK, true);
731}
732
733/* lock entire database with write lock - nonblocking varient */
734_PUBLIC_ int tdb_lockall_nonblock(struct tdb_context *tdb)
735{
736 int ret = tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_NOWAIT, false);
737 tdb_trace_ret(tdb, "tdb_lockall_nonblock", ret);
738 return ret;
739}
740
741/* unlock entire database with write lock */
742_PUBLIC_ int tdb_unlockall(struct tdb_context *tdb)
743{
744 tdb_trace(tdb, "tdb_unlockall");
745 return tdb_allrecord_unlock(tdb, F_WRLCK, false);
746}
747
748/* lock entire database with read lock */
749_PUBLIC_ int tdb_lockall_read(struct tdb_context *tdb)
750{
751 tdb_trace(tdb, "tdb_lockall_read");
752 return tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false);
753}
754
755/* lock entire database with read lock - nonblock varient */
756_PUBLIC_ int tdb_lockall_read_nonblock(struct tdb_context *tdb)
757{
758 int ret = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_NOWAIT, false);
759 tdb_trace_ret(tdb, "tdb_lockall_read_nonblock", ret);
760 return ret;
761}
762
763/* unlock entire database with read lock */
764_PUBLIC_ int tdb_unlockall_read(struct tdb_context *tdb)
765{
766 tdb_trace(tdb, "tdb_unlockall_read");
767 return tdb_allrecord_unlock(tdb, F_RDLCK, false);
768}
769
770/* lock/unlock one hash chain. This is meant to be used to reduce
771 contention - it cannot guarantee how many records will be locked */
772_PUBLIC_ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
773{
774 int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
775 tdb_trace_1rec(tdb, "tdb_chainlock", key);
776 return ret;
777}
778
779/* lock/unlock one hash chain, non-blocking. This is meant to be used
780 to reduce contention - it cannot guarantee how many records will be
781 locked */
782_PUBLIC_ int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
783{
784 int ret = tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
785 tdb_trace_1rec_ret(tdb, "tdb_chainlock_nonblock", key, ret);
786 return ret;
787}
788
789/* mark a chain as locked without actually locking it. Warning! use with great caution! */
790_PUBLIC_ int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
791{
792 int ret = tdb_nest_lock(tdb, lock_offset(BUCKET(tdb->hash_fn(&key))),
793 F_WRLCK, TDB_LOCK_MARK_ONLY);
794 tdb_trace_1rec(tdb, "tdb_chainlock_mark", key);
795 return ret;
796}
797
798/* unmark a chain as locked without actually locking it. Warning! use with great caution! */
799_PUBLIC_ int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
800{
801 tdb_trace_1rec(tdb, "tdb_chainlock_unmark", key);
802 return tdb_nest_unlock(tdb, lock_offset(BUCKET(tdb->hash_fn(&key))),
803 F_WRLCK, true);
804}
805
806_PUBLIC_ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
807{
808 tdb_trace_1rec(tdb, "tdb_chainunlock", key);
809 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
810}
811
812_PUBLIC_ int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
813{
814 int ret;
815 ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
816 tdb_trace_1rec(tdb, "tdb_chainlock_read", key);
817 return ret;
818}
819
820_PUBLIC_ int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
821{
822 tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
823 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
824}
825
826/* record lock stops delete underneath */
827int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
828{
829 if (tdb->allrecord_lock.count) {
830 return 0;
831 }
832 return off ? tdb_brlock(tdb, F_RDLCK, off, 1, TDB_LOCK_WAIT) : 0;
833}
834
835/*
836 Write locks override our own fcntl readlocks, so check it here.
837 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
838 an error to fail to get the lock here.
839*/
840int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
841{
842 struct tdb_traverse_lock *i;
843 for (i = &tdb->travlocks; i; i = i->next)
844 if (i->off == off)
845 return -1;
846 if (tdb->allrecord_lock.count) {
847 if (tdb->allrecord_lock.ltype == F_WRLCK) {
848 return 0;
849 }
850 return -1;
851 }
852 return tdb_brlock(tdb, F_WRLCK, off, 1, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
853}
854
855int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
856{
857 if (tdb->allrecord_lock.count) {
858 return 0;
859 }
860 return tdb_brunlock(tdb, F_WRLCK, off, 1);
861}
862
863/* fcntl locks don't stack: avoid unlocking someone else's */
864int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
865{
866 struct tdb_traverse_lock *i;
867 uint32_t count = 0;
868
869 if (tdb->allrecord_lock.count) {
870 return 0;
871 }
872
873 if (off == 0)
874 return 0;
875 for (i = &tdb->travlocks; i; i = i->next)
876 if (i->off == off)
877 count++;
878 return (count == 1 ? tdb_brunlock(tdb, F_RDLCK, off, 1) : 0);
879}
880
881bool tdb_have_extra_locks(struct tdb_context *tdb)
882{
883 unsigned int extra = tdb->num_lockrecs;
884
885 /* A transaction holds the lock for all records. */
886 if (!tdb->transaction && tdb->allrecord_lock.count) {
887 return true;
888 }
889
890 /* We always hold the active lock if CLEAR_IF_FIRST. */
891 if (find_nestlock(tdb, ACTIVE_LOCK)) {
892 extra--;
893 }
894
895 /* In a transaction, we expect to hold the transaction lock */
896 if (tdb->transaction && find_nestlock(tdb, TRANSACTION_LOCK)) {
897 extra--;
898 }
899
900 return extra;
901}
902
903/* The transaction code uses this to remove all locks. */
904void tdb_release_transaction_locks(struct tdb_context *tdb)
905{
906 unsigned int i, active = 0;
907
908 if (tdb->allrecord_lock.count != 0) {
909 tdb_brunlock(tdb, tdb->allrecord_lock.ltype, FREELIST_TOP, 0);
910 tdb->allrecord_lock.count = 0;
911 }
912
913 for (i=0;i<tdb->num_lockrecs;i++) {
914 struct tdb_lock_type *lck = &tdb->lockrecs[i];
915
916 /* Don't release the active lock! Copy it to first entry. */
917 if (lck->off == ACTIVE_LOCK) {
918 tdb->lockrecs[active++] = *lck;
919 } else {
920 tdb_brunlock(tdb, lck->ltype, lck->off, 1);
921 }
922 }
923 tdb->num_lockrecs = active;
924 if (tdb->num_lockrecs == 0) {
925 SAFE_FREE(tdb->lockrecs);
926 }
927}
Note: See TracBrowser for help on using the repository browser.