source: trunk/server/lib/tdb/common/lock.c@ 895

Last change on this file since 895 was 895, checked in by Silvan Scherrer, 9 years ago

tdb: Use DosSetFileLocks directly for db locks.

This is to overcome some fcntl() API incompleteness in kLIBC
(like inability to upgrade locks or join adjacent lock regions
into one). It made the torture test run a bit better but there
are still two major problems both coming from DosSetFileLocks
impl (which fcntl() is currently based upon too): a) inability to
detect deadlocks and b) missing atomic unlock/lock support if
unlock/lock regions don't match.

With the current implementation, tdborture works fine for 1 or 2
worker processes but hangs when there are 3 or more. [Before that,
it would only work with 1 process and would likely to corrupt
the database and terminate if there were 2 or more processes].

Author: Dmitriy Kuminov (@dmik).

File size: 26.8 KB
Line 
1 /*
2 Unix SMB/CIFS implementation.
3
4 trivial database library
5
6 Copyright (C) Andrew Tridgell 1999-2005
7 Copyright (C) Paul `Rusty' Russell 2000
8 Copyright (C) Jeremy Allison 2000-2003
9
10 ** NOTE! The following LGPL license applies to the tdb
11 ** library. This does NOT imply that all of Samba is released
12 ** under the LGPL
13
14 This library is free software; you can redistribute it and/or
15 modify it under the terms of the GNU Lesser General Public
16 License as published by the Free Software Foundation; either
17 version 3 of the License, or (at your option) any later version.
18
19 This library is distributed in the hope that it will be useful,
20 but WITHOUT ANY WARRANTY; without even the implied warranty of
21 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22 Lesser General Public License for more details.
23
24 You should have received a copy of the GNU Lesser General Public
25 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26*/
27
28#include "tdb_private.h"
29
30_PUBLIC_ void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)
31{
32 tdb->interrupt_sig_ptr = ptr;
33}
34
35#ifdef __OS2__
36enum os2_fl {
37 OS2_FL_LOCK = 0x1,
38 OS2_FL_RW = 0x2,
39 OS2_FL_WAIT = 0x4,
40 OS2_FL_UPGRADE = 0x8,
41};
42
43static int os2_set_file_locks(struct tdb_context *tdb, enum os2_fl flags,
44 off_t start, off_t offset, off_t len)
45{
46 FILELOCKL locks[2] = {0};
47 ULONG fl;
48 BOOL dowait;
49 APIRET arc;
50
51 if ((!(flags & OS2_FL_UPGRADE) && (start > offset || start < 0)) ||
52 offset < 0 || offset + len < 0) {
53 errno = EINVAL;
54 return -1;
55 }
56
57 fl = (flags & OS2_FL_RW) ? 0 : 1; /* excluive when rw, shared otherwise */
58
59 if (flags & OS2_FL_UPGRADE) {
60 locks[0].lOffset = offset;
61 locks[0].lRange = len == 0 ? OFF_MAX : len;
62 locks[1] = locks[0];
63 fl |= 2; /* atomic */
64 } else {
65 if (start != offset) {
66 locks[0].lOffset = start;
67 locks[0].lRange = offset - start;
68 fl |= 2; /* atomic */
69 }
70 locks[1].lOffset = start;
71 locks[1].lRange = len == 0 ? OFF_MAX : (locks[0].lRange + len);
72 }
73
74 dowait = (flags & (OS2_FL_LOCK | OS2_FL_UPGRADE)) && (flags & OS2_FL_WAIT);
75
76 TDB_LOG((tdb, TDB_DEBUG_TRACE, "os2_file_locks: fd=%d lock_fd=%d flags=%x start=%lld offset=%lld len=%lld pid=%d\n",
77 tdb->fd, tdb->lock_fd, flags, start, offset, len, getpid()));
78
79 arc = DosSetFileLocksL(tdb->lock_fd,
80 &locks[(flags & OS2_FL_LOCK) ? 0 : 1],
81 &locks[(flags & OS2_FL_LOCK) ? 1 : 0],
82 dowait ? SEM_INDEFINITE_WAIT : SEM_IMMEDIATE_RETURN,
83 fl);
84
85 TDB_LOG((tdb, TDB_DEBUG_TRACE, "os2_file_locks: arc=%d pid=%d\n", arc, getpid()));
86
87 if (arc) {
88 switch (arc) {
89 case ERROR_LOCK_VIOLATION:
90 errno = EACCES;
91 break;
92 case ERROR_INTERRUPT:
93 errno = EINTR;
94 break;
95 case ERROR_TIMEOUT:
96 errno = EDEADLK;
97 break;
98 default:
99 TDB_LOG((tdb, TDB_DEBUG_ERROR, "os2_file_locks failed, lock_fd=%d flags=%x start=%d off=%d len=%d (arc=%d)\n",
100 tdb->lock_fd, flags, start, offset, len, arc));
101 }
102 return -1;
103 }
104
105 return 0;
106}
107#else /* __OS2__ */
108static int fcntl_lock(struct tdb_context *tdb,
109 int rw, off_t off, off_t len, bool waitflag)
110{
111 struct flock fl;
112
113 fl.l_type = rw;
114 fl.l_whence = SEEK_SET;
115 fl.l_start = off;
116 fl.l_len = len;
117 fl.l_pid = 0;
118
119 if (waitflag)
120 return fcntl(tdb->fd, F_SETLKW, &fl);
121 else
122 return fcntl(tdb->fd, F_SETLK, &fl);
123}
124
125static int fcntl_unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
126{
127 struct flock fl;
128#if 0 /* Check they matched up locks and unlocks correctly. */
129 char line[80];
130 FILE *locks;
131 bool found = false;
132
133 locks = fopen("/proc/locks", "r");
134
135 while (fgets(line, 80, locks)) {
136 char *p;
137 int type, start, l;
138
139 /* eg. 1: FLOCK ADVISORY WRITE 2440 08:01:2180826 0 EOF */
140 p = strchr(line, ':') + 1;
141 if (strncmp(p, " POSIX ADVISORY ", strlen(" POSIX ADVISORY ")))
142 continue;
143 p += strlen(" FLOCK ADVISORY ");
144 if (strncmp(p, "READ ", strlen("READ ")) == 0)
145 type = F_RDLCK;
146 else if (strncmp(p, "WRITE ", strlen("WRITE ")) == 0)
147 type = F_WRLCK;
148 else
149 abort();
150 p += 6;
151 if (atoi(p) != getpid())
152 continue;
153 p = strchr(strchr(p, ' ') + 1, ' ') + 1;
154 start = atoi(p);
155 p = strchr(p, ' ') + 1;
156 if (strncmp(p, "EOF", 3) == 0)
157 l = 0;
158 else
159 l = atoi(p) - start + 1;
160
161 if (off == start) {
162 if (len != l) {
163 fprintf(stderr, "Len %u should be %u: %s",
164 (int)len, l, line);
165 abort();
166 }
167 if (type != rw) {
168 fprintf(stderr, "Type %s wrong: %s",
169 rw == F_RDLCK ? "READ" : "WRITE", line);
170 abort();
171 }
172 found = true;
173 break;
174 }
175 }
176
177 if (!found) {
178 fprintf(stderr, "Unlock on %u@%u not found!\n",
179 (int)off, (int)len);
180 abort();
181 }
182
183 fclose(locks);
184#endif
185
186 fl.l_type = F_UNLCK;
187 fl.l_whence = SEEK_SET;
188 fl.l_start = off;
189 fl.l_len = len;
190 fl.l_pid = 0;
191
192 return fcntl(tdb->fd, F_SETLKW, &fl);
193}
194#endif /* __OS2__ */
195
196/* list -1 is the alloc list, otherwise a hash chain. */
197static tdb_off_t lock_offset(int list)
198{
199 return FREELIST_TOP + 4*list;
200}
201
202/* a byte range locking function - return 0 on success
203 this functions locks/unlocks 1 byte at the specified offset.
204
205 On error, errno is also set so that errors are passed back properly
206 through tdb_open().
207
208 note that a len of zero means lock to end of file
209*/
210#ifdef __OS2__
211static int tdb_brlock_ex(struct tdb_context *tdb,
212 int rw_type, tdb_off_t start,
213 tdb_off_t offset, size_t len,
214 enum tdb_lock_flags flags)
215#else
216int tdb_brlock(struct tdb_context *tdb,
217 int rw_type, tdb_off_t offset, size_t len,
218 enum tdb_lock_flags flags)
219#endif
220{
221 int ret;
222
223 if (tdb->flags & TDB_NOLOCK) {
224 return 0;
225 }
226
227 if (flags & TDB_LOCK_MARK_ONLY) {
228 return 0;
229 }
230
231 if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
232 tdb->ecode = TDB_ERR_RDONLY;
233 return -1;
234 }
235
236#ifdef __OS2__
237 int os2_flags = OS2_FL_LOCK;
238 if (rw_type == F_WRLCK) {
239 os2_flags |= OS2_FL_RW;
240 }
241 if (flags & TDB_LOCK_WAIT) {
242 os2_flags |= OS2_FL_WAIT;
243 }
244 if (flags & TDB_LOCK_UPGRADE) {
245 os2_flags |= OS2_FL_UPGRADE;
246 }
247#endif
248
249 do {
250#ifdef __OS2__
251 ret = os2_set_file_locks(tdb, os2_flags, start, offset, len);
252#else
253 ret = fcntl_lock(tdb, rw_type, offset, len,
254 flags & TDB_LOCK_WAIT);
255#endif
256 /* Check for a sigalarm break. */
257 if (ret == -1 && errno == EINTR &&
258 tdb->interrupt_sig_ptr &&
259 *tdb->interrupt_sig_ptr) {
260 break;
261 }
262 } while (ret == -1 && errno == EINTR);
263
264 if (ret == -1) {
265 tdb->ecode = TDB_ERR_LOCK;
266 /* Generic lock error. errno set by fcntl.
267 * EAGAIN is an expected return from non-blocking
268 * locks. */
269 if (!(flags & TDB_LOCK_PROBE) && errno != EAGAIN) {
270 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d flags=%d len=%d (errno=%d)\n",
271 tdb->fd, offset, rw_type, flags, (int)len, errno));
272 }
273 return -1;
274 }
275 return 0;
276}
277
278#ifdef __OS2__
279int tdb_brlock(struct tdb_context *tdb,
280 int rw_type, tdb_off_t offset, size_t len,
281 enum tdb_lock_flags flags)
282{
283 return tdb_brlock_ex(tdb, rw_type, offset, offset, len, flags);
284}
285#endif
286
287#ifdef __OS2__
288static int tdb_brunlock_ex(struct tdb_context *tdb,
289 int rw_type, tdb_off_t start,
290 tdb_off_t offset, size_t len)
291#else
292int tdb_brunlock(struct tdb_context *tdb,
293 int rw_type, tdb_off_t offset, size_t len)
294#endif
295{
296 int ret;
297
298 if (tdb->flags & TDB_NOLOCK) {
299 return 0;
300 }
301
302 do {
303#ifdef __OS2__
304 ret = os2_set_file_locks(tdb, rw_type == F_WRLCK ? OS2_FL_RW : 0,
305 start, offset, len);
306#else
307 ret = fcntl_unlock(tdb, rw_type, offset, len);
308#endif
309 } while (ret == -1 && errno == EINTR);
310
311 if (ret == -1) {
312 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brunlock failed (fd=%d) at offset %d rw_type=%d len=%d (errno=%d)\n",
313 tdb->fd, offset, rw_type, (int)len, errno));
314 }
315 return ret;
316}
317
318#ifdef __OS2__
319int tdb_brunlock(struct tdb_context *tdb,
320 int rw_type, tdb_off_t offset, size_t len)
321{
322 return tdb_brunlock_ex(tdb, rw_type, offset, offset, len);
323}
324#endif
325
326/*
327 upgrade a read lock to a write lock. This needs to be handled in a
328 special way as some OSes (such as solaris) have too conservative
329 deadlock detection and claim a deadlock when progress can be
330 made. For those OSes we may loop for a while.
331*/
332int tdb_allrecord_upgrade(struct tdb_context *tdb)
333{
334 int count = 1000;
335
336 if (tdb->allrecord_lock.count != 1) {
337 TDB_LOG((tdb, TDB_DEBUG_ERROR,
338 "tdb_allrecord_upgrade failed: count %u too high\n",
339 tdb->allrecord_lock.count));
340 return -1;
341 }
342
343 if (tdb->allrecord_lock.off != 1) {
344 TDB_LOG((tdb, TDB_DEBUG_ERROR,
345 "tdb_allrecord_upgrade failed: already upgraded?\n"));
346 return -1;
347 }
348
349 while (count--) {
350 struct timeval tv;
351 if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0,
352 TDB_LOCK_WAIT|TDB_LOCK_PROBE|TDB_LOCK_UPGRADE) == 0) {
353 tdb->allrecord_lock.ltype = F_WRLCK;
354 tdb->allrecord_lock.off = 0;
355 return 0;
356 }
357 if (errno != EDEADLK) {
358 break;
359 }
360 /* sleep for as short a time as we can - more portable than usleep() */
361 tv.tv_sec = 0;
362 tv.tv_usec = 1;
363 select(0, NULL, NULL, NULL, &tv);
364 }
365 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_allrecord_upgrade failed\n"));
366 return -1;
367}
368
369static struct tdb_lock_type *find_nestlock(struct tdb_context *tdb,
370 tdb_off_t offset)
371{
372 unsigned int i;
373
374 for (i=0; i<tdb->num_lockrecs; i++) {
375 if (tdb->lockrecs[i].off == offset) {
376 return &tdb->lockrecs[i];
377 }
378 }
379 return NULL;
380}
381
382/* lock an offset in the database. */
383int tdb_nest_lock(struct tdb_context *tdb, uint32_t offset, int ltype,
384 enum tdb_lock_flags flags)
385{
386 struct tdb_lock_type *new_lck;
387
388 if (offset >= lock_offset(tdb->header.hash_size)) {
389 tdb->ecode = TDB_ERR_LOCK;
390 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid offset %u for ltype=%d\n",
391 offset, ltype));
392 return -1;
393 }
394 if (tdb->flags & TDB_NOLOCK)
395 return 0;
396
397 new_lck = find_nestlock(tdb, offset);
398 if (new_lck) {
399 /*
400 * Just increment the in-memory struct, posix locks
401 * don't stack.
402 */
403 new_lck->count++;
404 return 0;
405 }
406
407 new_lck = (struct tdb_lock_type *)realloc(
408 tdb->lockrecs,
409 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
410 if (new_lck == NULL) {
411 errno = ENOMEM;
412 return -1;
413 }
414 tdb->lockrecs = new_lck;
415
416 /* Since fcntl locks don't nest, we do a lock for the first one,
417 and simply bump the count for future ones */
418 if (tdb_brlock(tdb, ltype, offset, 1, flags)) {
419 return -1;
420 }
421
422 tdb->lockrecs[tdb->num_lockrecs].off = offset;
423 tdb->lockrecs[tdb->num_lockrecs].count = 1;
424 tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
425 tdb->num_lockrecs++;
426
427 return 0;
428}
429
430static int tdb_lock_and_recover(struct tdb_context *tdb)
431{
432 int ret;
433
434 /* We need to match locking order in transaction commit. */
435 if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT)) {
436 return -1;
437 }
438
439 if (tdb_brlock(tdb, F_WRLCK, OPEN_LOCK, 1, TDB_LOCK_WAIT)) {
440 tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
441 return -1;
442 }
443
444 ret = tdb_transaction_recover(tdb);
445
446 tdb_brunlock(tdb, F_WRLCK, OPEN_LOCK, 1);
447 tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
448
449 return ret;
450}
451
452static bool have_data_locks(const struct tdb_context *tdb)
453{
454 unsigned int i;
455
456 for (i = 0; i < tdb->num_lockrecs; i++) {
457 if (tdb->lockrecs[i].off >= lock_offset(-1))
458 return true;
459 }
460 return false;
461}
462
463static int tdb_lock_list(struct tdb_context *tdb, int list, int ltype,
464 enum tdb_lock_flags waitflag)
465{
466 int ret;
467 bool check = false;
468
469 /* a allrecord lock allows us to avoid per chain locks */
470 if (tdb->allrecord_lock.count &&
471 (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
472 return 0;
473 }
474
475 if (tdb->allrecord_lock.count) {
476 tdb->ecode = TDB_ERR_LOCK;
477 ret = -1;
478 } else {
479 /* Only check when we grab first data lock. */
480 check = !have_data_locks(tdb);
481 ret = tdb_nest_lock(tdb, lock_offset(list), ltype, waitflag);
482
483 if (ret == 0 && check && tdb_needs_recovery(tdb)) {
484 tdb_nest_unlock(tdb, lock_offset(list), ltype, false);
485
486 if (tdb_lock_and_recover(tdb) == -1) {
487 return -1;
488 }
489 return tdb_lock_list(tdb, list, ltype, waitflag);
490 }
491 }
492 return ret;
493}
494
495/* lock a list in the database. list -1 is the alloc list */
496int tdb_lock(struct tdb_context *tdb, int list, int ltype)
497{
498 int ret;
499
500 ret = tdb_lock_list(tdb, list, ltype, TDB_LOCK_WAIT);
501 if (ret) {
502 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
503 "ltype=%d (%s)\n", list, ltype, strerror(errno)));
504 }
505 return ret;
506}
507
508/* lock a list in the database. list -1 is the alloc list. non-blocking lock */
509int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
510{
511 return tdb_lock_list(tdb, list, ltype, TDB_LOCK_NOWAIT);
512}
513
514
515int tdb_nest_unlock(struct tdb_context *tdb, uint32_t offset, int ltype,
516 bool mark_lock)
517{
518 int ret = -1;
519 struct tdb_lock_type *lck;
520
521 if (tdb->flags & TDB_NOLOCK)
522 return 0;
523
524 /* Sanity checks */
525 if (offset >= lock_offset(tdb->header.hash_size)) {
526 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: offset %u invalid (%d)\n", offset, tdb->header.hash_size));
527 return ret;
528 }
529
530 lck = find_nestlock(tdb, offset);
531 if ((lck == NULL) || (lck->count == 0)) {
532 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
533 return -1;
534 }
535
536 if (lck->count > 1) {
537 lck->count--;
538 return 0;
539 }
540
541 /*
542 * This lock has count==1 left, so we need to unlock it in the
543 * kernel. We don't bother with decrementing the in-memory array
544 * element, we're about to overwrite it with the last array element
545 * anyway.
546 */
547
548 if (mark_lock) {
549 ret = 0;
550 } else {
551 ret = tdb_brunlock(tdb, ltype, offset, 1);
552 }
553
554 /*
555 * Shrink the array by overwriting the element just unlocked with the
556 * last array element.
557 */
558 *lck = tdb->lockrecs[--tdb->num_lockrecs];
559
560 /*
561 * We don't bother with realloc when the array shrinks, but if we have
562 * a completely idle tdb we should get rid of the locked array.
563 */
564
565 if (tdb->num_lockrecs == 0) {
566 SAFE_FREE(tdb->lockrecs);
567 }
568
569 if (ret)
570 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
571 return ret;
572}
573
574int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
575{
576 /* a global lock allows us to avoid per chain locks */
577 if (tdb->allrecord_lock.count &&
578 (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
579 return 0;
580 }
581
582 if (tdb->allrecord_lock.count) {
583 tdb->ecode = TDB_ERR_LOCK;
584 return -1;
585 }
586
587 return tdb_nest_unlock(tdb, lock_offset(list), ltype, false);
588}
589
590/*
591 get the transaction lock
592 */
593int tdb_transaction_lock(struct tdb_context *tdb, int ltype,
594 enum tdb_lock_flags lockflags)
595{
596 return tdb_nest_lock(tdb, TRANSACTION_LOCK, ltype, lockflags);
597}
598
599/*
600 release the transaction lock
601 */
602int tdb_transaction_unlock(struct tdb_context *tdb, int ltype)
603{
604 return tdb_nest_unlock(tdb, TRANSACTION_LOCK, ltype, false);
605}
606
607/* Returns 0 if all done, -1 if error, 1 if ok. */
608static int tdb_allrecord_check(struct tdb_context *tdb, int ltype,
609 enum tdb_lock_flags flags, bool upgradable)
610{
611 /* There are no locks on read-only dbs */
612 if (tdb->read_only || tdb->traverse_read) {
613 tdb->ecode = TDB_ERR_LOCK;
614 return -1;
615 }
616
617 if (tdb->allrecord_lock.count && tdb->allrecord_lock.ltype == ltype) {
618 tdb->allrecord_lock.count++;
619 return 0;
620 }
621
622 if (tdb->allrecord_lock.count) {
623 /* a global lock of a different type exists */
624 tdb->ecode = TDB_ERR_LOCK;
625 return -1;
626 }
627
628 if (tdb_have_extra_locks(tdb)) {
629 /* can't combine global and chain locks */
630 tdb->ecode = TDB_ERR_LOCK;
631 return -1;
632 }
633
634 if (upgradable && ltype != F_RDLCK) {
635 /* tdb error: you can't upgrade a write lock! */
636 tdb->ecode = TDB_ERR_LOCK;
637 return -1;
638 }
639 return 1;
640}
641
642#ifdef __OS2__
643#define TDB_ADJLOCK_START_DECL size_t start,
644#define TDB_ADJLOCK_START start,
645#define TDB_ADJLOCK(tdb, rw_type, start, offset, len, flags) \
646 tdb_brlock_ex(tdb, rw_type, start, offset, len, flags)
647#define TDB_ADJUNLOCK(tdb, rw_type, start, offset, len) \
648 tdb_brunlock_ex(tdb, rw_type, start, offset, len)
649#define TDB_CHAINLOCK_GRADUAL(tdb, ltype, flags, start, offset, len) \
650 tdb_chainlock_gradual(tdb, ltype, flags, start, offset, len)
651#else
652#define TDB_ADJLOCK_START_DECL
653#define TDB_ADJLOCK_START_REF
654#define TDB_ADJLOCK(tdb, rw_type, start, offset, len, flags) \
655 tdb_brlock(tdb, rw_type, offset, len, flags)
656#define TDB_ADJUNLOCK(tdb, rw_type, start, offset, len) \
657 tdb_brunlock(tdb, rw_type, offset, len)
658#define TDB_CHAINLOCK_GRADUAL(tdb, ltype, flags, start, offset, len) \
659 tdb_chainlock_gradual(tdb, ltype, flags, offset, len)
660#endif
661
662/* We only need to lock individual bytes, but Linux merges consecutive locks
663 * so we lock in contiguous ranges. */
664static int tdb_chainlock_gradual(struct tdb_context *tdb,
665 int ltype, enum tdb_lock_flags flags,
666 TDB_ADJLOCK_START_DECL size_t off, size_t len)
667{
668 int ret;
669 enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
670
671 if (len <= 4) {
672 /* Single record. Just do blocking lock. */
673 return TDB_ADJLOCK(tdb, ltype, start, off, len, flags);
674 }
675
676 /* First we try non-blocking. */
677 ret = TDB_ADJLOCK(tdb, ltype, start, off, len, nb_flags);
678 if (ret == 0) {
679 return 0;
680 }
681
682 /* Try locking first half, then second. */
683 ret = TDB_CHAINLOCK_GRADUAL(tdb, ltype, flags, start, off, len / 2);
684 if (ret == -1)
685 return -1;
686
687 ret = TDB_CHAINLOCK_GRADUAL(tdb, ltype, flags, start,
688 off + len / 2, len - len / 2);
689 if (ret == -1) {
690 TDB_ADJUNLOCK(tdb, ltype, start, off, len / 2);
691 return -1;
692 }
693 return 0;
694}
695
696/* lock/unlock entire database. It can only be upgradable if you have some
697 * other way of guaranteeing exclusivity (ie. transaction write lock).
698 * We do the locking gradually to avoid being starved by smaller locks. */
699int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
700 enum tdb_lock_flags flags, bool upgradable)
701{
702 switch (tdb_allrecord_check(tdb, ltype, flags, upgradable)) {
703 case -1:
704 return -1;
705 case 0:
706 return 0;
707 }
708
709 /* We cover two kinds of locks:
710 * 1) Normal chain locks. Taken for almost all operations.
711 * 3) Individual records locks. Taken after normal or free
712 * chain locks.
713 *
714 * It is (1) which cause the starvation problem, so we're only
715 * gradual for that. */
716 if (TDB_CHAINLOCK_GRADUAL(tdb, ltype, flags, FREELIST_TOP, FREELIST_TOP,
717 tdb->header.hash_size * 4) == -1) {
718 return -1;
719 }
720
721 /* Grab individual record locks. */
722 if (TDB_ADJLOCK(tdb, ltype, FREELIST_TOP, lock_offset(tdb->header.hash_size), 0,
723 flags) == -1) {
724 tdb_brunlock(tdb, ltype, FREELIST_TOP,
725 tdb->header.hash_size * 4);
726 return -1;
727 }
728
729 tdb->allrecord_lock.count = 1;
730 /* If it's upgradable, it's actually exclusive so we can treat
731 * it as a write lock. */
732 tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
733 tdb->allrecord_lock.off = upgradable;
734
735 if (tdb_needs_recovery(tdb)) {
736 bool mark = flags & TDB_LOCK_MARK_ONLY;
737 tdb_allrecord_unlock(tdb, ltype, mark);
738 if (mark) {
739 tdb->ecode = TDB_ERR_LOCK;
740 TDB_LOG((tdb, TDB_DEBUG_ERROR,
741 "tdb_lockall_mark cannot do recovery\n"));
742 return -1;
743 }
744 if (tdb_lock_and_recover(tdb) == -1) {
745 return -1;
746 }
747 return tdb_allrecord_lock(tdb, ltype, flags, upgradable);
748 }
749
750 return 0;
751}
752
753
754
755/* unlock entire db */
756int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype, bool mark_lock)
757{
758 /* There are no locks on read-only dbs */
759 if (tdb->read_only || tdb->traverse_read) {
760 tdb->ecode = TDB_ERR_LOCK;
761 return -1;
762 }
763
764 if (tdb->allrecord_lock.count == 0) {
765 tdb->ecode = TDB_ERR_LOCK;
766 return -1;
767 }
768
769 /* Upgradable locks are marked as write locks. */
770 if (tdb->allrecord_lock.ltype != ltype
771 && (!tdb->allrecord_lock.off || ltype != F_RDLCK)) {
772 tdb->ecode = TDB_ERR_LOCK;
773 return -1;
774 }
775
776 if (tdb->allrecord_lock.count > 1) {
777 tdb->allrecord_lock.count--;
778 return 0;
779 }
780
781 if (!mark_lock && tdb_brunlock(tdb, ltype, FREELIST_TOP, 0)) {
782 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
783 return -1;
784 }
785
786 tdb->allrecord_lock.count = 0;
787 tdb->allrecord_lock.ltype = 0;
788
789 return 0;
790}
791
792/* lock entire database with write lock */
793_PUBLIC_ int tdb_lockall(struct tdb_context *tdb)
794{
795 tdb_trace(tdb, "tdb_lockall");
796 return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false);
797}
798
799/* lock entire database with write lock - mark only */
800_PUBLIC_ int tdb_lockall_mark(struct tdb_context *tdb)
801{
802 tdb_trace(tdb, "tdb_lockall_mark");
803 return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_MARK_ONLY, false);
804}
805
806/* unlock entire database with write lock - unmark only */
807_PUBLIC_ int tdb_lockall_unmark(struct tdb_context *tdb)
808{
809 tdb_trace(tdb, "tdb_lockall_unmark");
810 return tdb_allrecord_unlock(tdb, F_WRLCK, true);
811}
812
813/* lock entire database with write lock - nonblocking varient */
814_PUBLIC_ int tdb_lockall_nonblock(struct tdb_context *tdb)
815{
816 int ret = tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_NOWAIT, false);
817 tdb_trace_ret(tdb, "tdb_lockall_nonblock", ret);
818 return ret;
819}
820
821/* unlock entire database with write lock */
822_PUBLIC_ int tdb_unlockall(struct tdb_context *tdb)
823{
824 tdb_trace(tdb, "tdb_unlockall");
825 return tdb_allrecord_unlock(tdb, F_WRLCK, false);
826}
827
828/* lock entire database with read lock */
829_PUBLIC_ int tdb_lockall_read(struct tdb_context *tdb)
830{
831 tdb_trace(tdb, "tdb_lockall_read");
832 return tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false);
833}
834
835/* lock entire database with read lock - nonblock varient */
836_PUBLIC_ int tdb_lockall_read_nonblock(struct tdb_context *tdb)
837{
838 int ret = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_NOWAIT, false);
839 tdb_trace_ret(tdb, "tdb_lockall_read_nonblock", ret);
840 return ret;
841}
842
843/* unlock entire database with read lock */
844_PUBLIC_ int tdb_unlockall_read(struct tdb_context *tdb)
845{
846 tdb_trace(tdb, "tdb_unlockall_read");
847 return tdb_allrecord_unlock(tdb, F_RDLCK, false);
848}
849
850/* lock/unlock one hash chain. This is meant to be used to reduce
851 contention - it cannot guarantee how many records will be locked */
852_PUBLIC_ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
853{
854 int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
855 tdb_trace_1rec(tdb, "tdb_chainlock", key);
856 return ret;
857}
858
859/* lock/unlock one hash chain, non-blocking. This is meant to be used
860 to reduce contention - it cannot guarantee how many records will be
861 locked */
862_PUBLIC_ int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
863{
864 int ret = tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
865 tdb_trace_1rec_ret(tdb, "tdb_chainlock_nonblock", key, ret);
866 return ret;
867}
868
869/* mark a chain as locked without actually locking it. Warning! use with great caution! */
870_PUBLIC_ int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
871{
872 int ret = tdb_nest_lock(tdb, lock_offset(BUCKET(tdb->hash_fn(&key))),
873 F_WRLCK, TDB_LOCK_MARK_ONLY);
874 tdb_trace_1rec(tdb, "tdb_chainlock_mark", key);
875 return ret;
876}
877
878/* unmark a chain as locked without actually locking it. Warning! use with great caution! */
879_PUBLIC_ int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
880{
881 tdb_trace_1rec(tdb, "tdb_chainlock_unmark", key);
882 return tdb_nest_unlock(tdb, lock_offset(BUCKET(tdb->hash_fn(&key))),
883 F_WRLCK, true);
884}
885
886_PUBLIC_ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
887{
888 tdb_trace_1rec(tdb, "tdb_chainunlock", key);
889 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
890}
891
892_PUBLIC_ int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
893{
894 int ret;
895 ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
896 tdb_trace_1rec(tdb, "tdb_chainlock_read", key);
897 return ret;
898}
899
900_PUBLIC_ int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
901{
902 tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
903 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
904}
905
906/* record lock stops delete underneath */
907int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
908{
909 if (tdb->allrecord_lock.count) {
910 return 0;
911 }
912 return off ? tdb_brlock(tdb, F_RDLCK, off, 1, TDB_LOCK_WAIT) : 0;
913}
914
915/*
916 Write locks override our own fcntl readlocks, so check it here.
917 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
918 an error to fail to get the lock here.
919*/
920int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
921{
922 struct tdb_traverse_lock *i;
923 for (i = &tdb->travlocks; i; i = i->next)
924 if (i->off == off)
925 return -1;
926 if (tdb->allrecord_lock.count) {
927 if (tdb->allrecord_lock.ltype == F_WRLCK) {
928 return 0;
929 }
930 return -1;
931 }
932 return tdb_brlock(tdb, F_WRLCK, off, 1, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
933}
934
935int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
936{
937 if (tdb->allrecord_lock.count) {
938 return 0;
939 }
940 return tdb_brunlock(tdb, F_WRLCK, off, 1);
941}
942
943/* fcntl locks don't stack: avoid unlocking someone else's */
944int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
945{
946 struct tdb_traverse_lock *i;
947 uint32_t count = 0;
948
949 if (tdb->allrecord_lock.count) {
950 return 0;
951 }
952
953 if (off == 0)
954 return 0;
955 for (i = &tdb->travlocks; i; i = i->next)
956 if (i->off == off)
957 count++;
958 return (count == 1 ? tdb_brunlock(tdb, F_RDLCK, off, 1) : 0);
959}
960
961bool tdb_have_extra_locks(struct tdb_context *tdb)
962{
963 unsigned int extra = tdb->num_lockrecs;
964
965 /* A transaction holds the lock for all records. */
966 if (!tdb->transaction && tdb->allrecord_lock.count) {
967 return true;
968 }
969
970 /* We always hold the active lock if CLEAR_IF_FIRST. */
971 if (find_nestlock(tdb, ACTIVE_LOCK)) {
972 extra--;
973 }
974
975 /* In a transaction, we expect to hold the transaction lock */
976 if (tdb->transaction && find_nestlock(tdb, TRANSACTION_LOCK)) {
977 extra--;
978 }
979
980 return extra;
981}
982
983/* The transaction code uses this to remove all locks. */
984void tdb_release_transaction_locks(struct tdb_context *tdb)
985{
986 unsigned int i, active = 0;
987
988 if (tdb->allrecord_lock.count != 0) {
989 tdb_brunlock(tdb, tdb->allrecord_lock.ltype, FREELIST_TOP, 0);
990 tdb->allrecord_lock.count = 0;
991 }
992
993 for (i=0;i<tdb->num_lockrecs;i++) {
994 struct tdb_lock_type *lck = &tdb->lockrecs[i];
995
996 /* Don't release the active lock! Copy it to first entry. */
997 if (lck->off == ACTIVE_LOCK) {
998 tdb->lockrecs[active++] = *lck;
999 } else {
1000 tdb_brunlock(tdb, lck->ltype, lck->off, 1);
1001 }
1002 }
1003 tdb->num_lockrecs = active;
1004 if (tdb->num_lockrecs == 0) {
1005 SAFE_FREE(tdb->lockrecs);
1006 }
1007}
Note: See TracBrowser for help on using the repository browser.