source: trunk/samba/source/tdb/common/transaction.c @ 30

Last change on this file since 30 was 30, checked in by Paul Smedley, 14 years ago

Code updated to Samba 3.0.25rc2 level

File size: 31.7 KB
Line 
1 /*
2   Unix SMB/CIFS implementation.
3
4   trivial database library
5
6   Copyright (C) Andrew Tridgell              2005
7
8     ** NOTE! The following LGPL license applies to the tdb
9     ** library. This does NOT imply that all of Samba is released
10     ** under the LGPL
11   
12   This library is free software; you can redistribute it and/or
13   modify it under the terms of the GNU Lesser General Public
14   License as published by the Free Software Foundation; either
15   version 2 of the License, or (at your option) any later version.
16
17   This library is distributed in the hope that it will be useful,
18   but WITHOUT ANY WARRANTY; without even the implied warranty of
19   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20   Lesser General Public License for more details.
21
22   You should have received a copy of the GNU Lesser General Public
23   License along with this library; if not, write to the Free Software
24   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25*/
26
27#include "tdb_private.h"
28
29/*
30  transaction design:
31
32  - only allow a single transaction at a time per database. This makes
33    using the transaction API simpler, as otherwise the caller would
34    have to cope with temporary failures in transactions that conflict
35    with other current transactions
36
37  - keep the transaction recovery information in the same file as the
38    database, using a special 'transaction recovery' record pointed at
39    by the header. This removes the need for extra journal files as
40    used by some other databases
41
42  - dynamically allocated the transaction recover record, re-using it
43    for subsequent transactions. If a larger record is needed then
44    tdb_free() the old record to place it on the normal tdb freelist
45    before allocating the new record
46
47  - during transactions, keep a linked list of writes all that have
48    been performed by intercepting all tdb_write() calls. The hooked
49    transaction versions of tdb_read() and tdb_write() check this
50    linked list and try to use the elements of the list in preference
51    to the real database.
52
53  - don't allow any locks to be held when a transaction starts,
54    otherwise we can end up with deadlock (plus lack of lock nesting
55    in posix locks would mean the lock is lost)
56
57  - if the caller gains a lock during the transaction but doesn't
58    release it then fail the commit
59
60  - allow for nested calls to tdb_transaction_start(), re-using the
61    existing transaction record. If the inner transaction is cancelled
62    then a subsequent commit will fail
63 
64  - keep a mirrored copy of the tdb hash chain heads to allow for the
65    fast hash heads scan on traverse, updating the mirrored copy in
66    the transaction version of tdb_write
67
68  - allow callers to mix transaction and non-transaction use of tdb,
69    although once a transaction is started then an exclusive lock is
70    gained until the transaction is committed or cancelled
71
72  - the commit stategy involves first saving away all modified data
73    into a linearised buffer in the transaction recovery area, then
74    marking the transaction recovery area with a magic value to
75    indicate a valid recovery record. In total 4 fsync/msync calls are
76    needed per commit to prevent race conditions. It might be possible
77    to reduce this to 3 or even 2 with some more work.
78
79  - check for a valid recovery record on open of the tdb, while the
80    global lock is held. Automatically recover from the transaction
81    recovery area if needed, then continue with the open as
82    usual. This allows for smooth crash recovery with no administrator
83    intervention.
84
85  - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86    still available, but no transaction recovery area is used and no
87    fsync/msync calls are made.
88
89*/
90
91int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
92                       int rw_type, int lck_type, int probe, size_t len);
93
94struct tdb_transaction_el {
95        struct tdb_transaction_el *next, *prev;
96        tdb_off_t offset;
97        tdb_len_t length;
98        unsigned char *data;
99};
100
101/*
102  hold the context of any current transaction
103*/
104struct tdb_transaction {
105        /* we keep a mirrored copy of the tdb hash heads here so
106           tdb_next_hash_chain() can operate efficiently */
107        u32 *hash_heads;
108
109        /* the original io methods - used to do IOs to the real db */
110        const struct tdb_methods *io_methods;
111
112        /* the list of transaction elements. We use a doubly linked
113           list with a last pointer to allow us to keep the list
114           ordered, with first element at the front of the list. It
115           needs to be doubly linked as the read/write traversals need
116           to be backwards, while the commit needs to be forwards */
117        struct tdb_transaction_el *elements, *elements_last;
118
119        /* non-zero when an internal transaction error has
120           occurred. All write operations will then fail until the
121           transaction is ended */
122        int transaction_error;
123
124        /* when inside a transaction we need to keep track of any
125           nested tdb_transaction_start() calls, as these are allowed,
126           but don't create a new transaction */
127        int nesting;
128
129        /* old file size before transaction */
130        tdb_len_t old_map_size;
131};
132
133
134/*
135  read while in a transaction. We need to check first if the data is in our list
136  of transaction elements, then if not do a real read
137*/
138static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
139                            tdb_len_t len, int cv)
140{
141        struct tdb_transaction_el *el;
142
143        /* we need to walk the list backwards to get the most recent data */
144        for (el=tdb->transaction->elements_last;el;el=el->prev) {
145                tdb_len_t partial;
146
147                if (off+len <= el->offset) {
148                        continue;
149                }
150                if (off >= el->offset + el->length) {
151                        continue;
152                }
153
154                /* an overlapping read - needs to be split into up to
155                   2 reads and a memcpy */
156                if (off < el->offset) {
157                        partial = el->offset - off;
158                        if (transaction_read(tdb, off, buf, partial, cv) != 0) {
159                                goto fail;
160                        }
161                        len -= partial;
162                        off += partial;
163                        buf = (void *)(partial + (char *)buf);
164                }
165                if (off + len <= el->offset + el->length) {
166                        partial = len;
167                } else {
168                        partial = el->offset + el->length - off;
169                }
170                memcpy(buf, el->data + (off - el->offset), partial);
171                if (cv) {
172                        tdb_convert(buf, len);
173                }
174                len -= partial;
175                off += partial;
176                buf = (void *)(partial + (char *)buf);
177               
178                if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
179                        goto fail;
180                }
181
182                return 0;
183        }
184
185        /* its not in the transaction elements - do a real read */
186        return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
187
188fail:
189        TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
190        tdb->ecode = TDB_ERR_IO;
191        tdb->transaction->transaction_error = 1;
192        return -1;
193}
194
195
196/*
197  write while in a transaction
198*/
199static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
200                             const void *buf, tdb_len_t len)
201{
202        struct tdb_transaction_el *el, *best_el=NULL;
203
204        if (len == 0) {
205                return 0;
206        }
207       
208        /* if the write is to a hash head, then update the transaction
209           hash heads */
210        if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
211            off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
212                u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
213                memcpy(&tdb->transaction->hash_heads[chain], buf, len);
214        }
215
216        /* first see if we can replace an existing entry */
217        for (el=tdb->transaction->elements_last;el;el=el->prev) {
218                tdb_len_t partial;
219
220                if (best_el == NULL && off == el->offset+el->length) {
221                        best_el = el;
222                }
223
224                if (off+len <= el->offset) {
225                        continue;
226                }
227                if (off >= el->offset + el->length) {
228                        continue;
229                }
230
231                /* an overlapping write - needs to be split into up to
232                   2 writes and a memcpy */
233                if (off < el->offset) {
234                        partial = el->offset - off;
235                        if (transaction_write(tdb, off, buf, partial) != 0) {
236                                goto fail;
237                        }
238                        len -= partial;
239                        off += partial;
240                        buf = (const void *)(partial + (const char *)buf);
241                }
242                if (off + len <= el->offset + el->length) {
243                        partial = len;
244                } else {
245                        partial = el->offset + el->length - off;
246                }
247                memcpy(el->data + (off - el->offset), buf, partial);
248                len -= partial;
249                off += partial;
250                buf = (const void *)(partial + (const char *)buf);
251               
252                if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
253                        goto fail;
254                }
255
256                return 0;
257        }
258
259        /* see if we can append the new entry to an existing entry */
260        if (best_el && best_el->offset + best_el->length == off && 
261            (off+len < tdb->transaction->old_map_size ||
262             off > tdb->transaction->old_map_size)) {
263                unsigned char *data = best_el->data;
264                el = best_el;
265                el->data = (unsigned char *)realloc(el->data,
266                                                    el->length + len);
267                if (el->data == NULL) {
268                        tdb->ecode = TDB_ERR_OOM;
269                        tdb->transaction->transaction_error = 1;
270                        el->data = data;
271                        return -1;
272                }
273                if (buf) {
274                        memcpy(el->data + el->length, buf, len);
275                } else {
276                        memset(el->data + el->length, TDB_PAD_BYTE, len);
277                }
278                el->length += len;
279                return 0;
280        }
281
282        /* add a new entry at the end of the list */
283        el = (struct tdb_transaction_el *)malloc(sizeof(*el));
284        if (el == NULL) {
285                tdb->ecode = TDB_ERR_OOM;
286                tdb->transaction->transaction_error = 1;               
287                return -1;
288        }
289        el->next = NULL;
290        el->prev = tdb->transaction->elements_last;
291        el->offset = off;
292        el->length = len;
293        el->data = (unsigned char *)malloc(len);
294        if (el->data == NULL) {
295                free(el);
296                tdb->ecode = TDB_ERR_OOM;
297                tdb->transaction->transaction_error = 1;               
298                return -1;
299        }
300        if (buf) {
301                memcpy(el->data, buf, len);
302        } else {
303                memset(el->data, TDB_PAD_BYTE, len);
304        }
305        if (el->prev) {
306                el->prev->next = el;
307        } else {
308                tdb->transaction->elements = el;
309        }
310        tdb->transaction->elements_last = el;
311        return 0;
312
313fail:
314        TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
315        tdb->ecode = TDB_ERR_IO;
316        tdb->transaction->transaction_error = 1;
317        return -1;
318}
319
320/*
321  accelerated hash chain head search, using the cached hash heads
322*/
323static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
324{
325        u32 h = *chain;
326        for (;h < tdb->header.hash_size;h++) {
327                /* the +1 takes account of the freelist */
328                if (0 != tdb->transaction->hash_heads[h+1]) {
329                        break;
330                }
331        }
332        (*chain) = h;
333}
334
335/*
336  out of bounds check during a transaction
337*/
338static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
339{
340        if (len <= tdb->map_size) {
341                return 0;
342        }
343        return TDB_ERRCODE(TDB_ERR_IO, -1);
344}
345
346/*
347  transaction version of tdb_expand().
348*/
349static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
350                                   tdb_off_t addition)
351{
352        /* add a write to the transaction elements, so subsequent
353           reads see the zero data */
354        if (transaction_write(tdb, size, NULL, addition) != 0) {
355                return -1;
356        }
357
358        return 0;
359}
360
361/*
362  brlock during a transaction - ignore them
363*/
364int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
365                       int rw_type, int lck_type, int probe, size_t len)
366{
367        return 0;
368}
369
370static const struct tdb_methods transaction_methods = {
371        transaction_read,
372        transaction_write,
373        transaction_next_hash_chain,
374        transaction_oob,
375        transaction_expand_file,
376        transaction_brlock
377};
378
379
380/*
381  start a tdb transaction. No token is returned, as only a single
382  transaction is allowed to be pending per tdb_context
383*/
384int tdb_transaction_start(struct tdb_context *tdb)
385{
386        /* some sanity checks */
387        if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
388                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
389                tdb->ecode = TDB_ERR_EINVAL;
390                return -1;
391        }
392
393        /* cope with nested tdb_transaction_start() calls */
394        if (tdb->transaction != NULL) {
395                tdb->transaction->nesting++;
396                TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
397                         tdb->transaction->nesting));
398                return 0;
399        }
400
401        if (tdb->num_locks != 0 || tdb->global_lock.count) {
402                /* the caller must not have any locks when starting a
403                   transaction as otherwise we'll be screwed by lack
404                   of nested locks in posix */
405                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
406                tdb->ecode = TDB_ERR_LOCK;
407                return -1;
408        }
409
410        if (tdb->travlocks.next != NULL) {
411                /* you cannot use transactions inside a traverse (although you can use
412                   traverse inside a transaction) as otherwise you can end up with
413                   deadlock */
414                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
415                tdb->ecode = TDB_ERR_LOCK;
416                return -1;
417        }
418
419        tdb->transaction = (struct tdb_transaction *)
420                calloc(sizeof(struct tdb_transaction), 1);
421        if (tdb->transaction == NULL) {
422                tdb->ecode = TDB_ERR_OOM;
423                return -1;
424        }
425
426        /* get the transaction write lock. This is a blocking lock. As
427           discussed with Volker, there are a number of ways we could
428           make this async, which we will probably do in the future */
429        if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
430                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
431                tdb->ecode = TDB_ERR_LOCK;
432                SAFE_FREE(tdb->transaction);
433                return -1;
434        }
435       
436        /* get a read lock from the freelist to the end of file. This
437           is upgraded to a write lock during the commit */
438        if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
439                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
440                tdb->ecode = TDB_ERR_LOCK;
441                goto fail;
442        }
443
444        /* setup a copy of the hash table heads so the hash scan in
445           traverse can be fast */
446        tdb->transaction->hash_heads = (u32 *)
447                calloc(tdb->header.hash_size+1, sizeof(u32));
448        if (tdb->transaction->hash_heads == NULL) {
449                tdb->ecode = TDB_ERR_OOM;
450                goto fail;
451        }
452        if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
453                                   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
454                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
455                tdb->ecode = TDB_ERR_IO;
456                goto fail;
457        }
458
459        /* make sure we know about any file expansions already done by
460           anyone else */
461        tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
462        tdb->transaction->old_map_size = tdb->map_size;
463
464        /* finally hook the io methods, replacing them with
465           transaction specific methods */
466        tdb->transaction->io_methods = tdb->methods;
467        tdb->methods = &transaction_methods;
468
469        /* by calling this transaction write here, we ensure that we don't grow the
470           transaction linked list due to hash table updates */
471        if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
472                              TDB_HASHTABLE_SIZE(tdb)) != 0) {
473                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
474                tdb->ecode = TDB_ERR_IO;
475                goto fail;
476        }
477
478        return 0;
479       
480fail:
481        tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
482        tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
483        SAFE_FREE(tdb->transaction->hash_heads);
484        SAFE_FREE(tdb->transaction);
485        return -1;
486}
487
488
489/*
490  cancel the current transaction
491*/
492int tdb_transaction_cancel(struct tdb_context *tdb)
493{       
494        if (tdb->transaction == NULL) {
495                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
496                return -1;
497        }
498
499        if (tdb->transaction->nesting != 0) {
500                tdb->transaction->transaction_error = 1;
501                tdb->transaction->nesting--;
502                return 0;
503        }               
504
505        tdb->map_size = tdb->transaction->old_map_size;
506
507        /* free all the transaction elements */
508        while (tdb->transaction->elements) {
509                struct tdb_transaction_el *el = tdb->transaction->elements;
510                tdb->transaction->elements = el->next;
511                free(el->data);
512                free(el);
513        }
514
515        /* remove any global lock created during the transaction */
516        if (tdb->global_lock.count != 0) {
517                tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
518                tdb->global_lock.count = 0;
519        }
520
521        /* remove any locks created during the transaction */
522        if (tdb->num_locks != 0) {
523                int i;
524                for (i=0;i<tdb->num_lockrecs;i++) {
525                        tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
526                                   F_UNLCK,F_SETLKW, 0, 1);
527                }
528                tdb->num_locks = 0;
529                tdb->num_lockrecs = 0;
530                SAFE_FREE(tdb->lockrecs);
531        }
532
533        /* restore the normal io methods */
534        tdb->methods = tdb->transaction->io_methods;
535
536        tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
537        tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
538        SAFE_FREE(tdb->transaction->hash_heads);
539        SAFE_FREE(tdb->transaction);
540       
541        return 0;
542}
543
544/*
545  sync to disk
546*/
547static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
548{       
549        if (fsync(tdb->fd) != 0) {
550                tdb->ecode = TDB_ERR_IO;
551                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
552                return -1;
553        }
554#ifdef MS_SYNC
555        if (tdb->map_ptr) {
556                tdb_off_t moffset = offset & ~(tdb->page_size-1);
557                if (msync(moffset + (char *)tdb->map_ptr, 
558                          length + (offset - moffset), MS_SYNC) != 0) {
559                        tdb->ecode = TDB_ERR_IO;
560                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
561                                 strerror(errno)));
562                        return -1;
563                }
564        }
565#endif
566        return 0;
567}
568
569
570/*
571  work out how much space the linearised recovery data will consume
572*/
573static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
574{
575        struct tdb_transaction_el *el;
576        tdb_len_t recovery_size = 0;
577
578        recovery_size = sizeof(u32);
579        for (el=tdb->transaction->elements;el;el=el->next) {
580                if (el->offset >= tdb->transaction->old_map_size) {
581                        continue;
582                }
583                recovery_size += 2*sizeof(tdb_off_t) + el->length;
584        }
585
586        return recovery_size;
587}
588
589/*
590  allocate the recovery area, or use an existing recovery area if it is
591  large enough
592*/
593static int tdb_recovery_allocate(struct tdb_context *tdb, 
594                                 tdb_len_t *recovery_size,
595                                 tdb_off_t *recovery_offset,
596                                 tdb_len_t *recovery_max_size)
597{
598        struct list_struct rec;
599        const struct tdb_methods *methods = tdb->transaction->io_methods;
600        tdb_off_t recovery_head;
601
602        if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
603                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
604                return -1;
605        }
606
607        rec.rec_len = 0;
608
609        if (recovery_head != 0 && 
610            methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
611                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
612                return -1;
613        }
614
615        *recovery_size = tdb_recovery_size(tdb);
616
617        if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
618                /* it fits in the existing area */
619                *recovery_max_size = rec.rec_len;
620                *recovery_offset = recovery_head;
621                return 0;
622        }
623
624        /* we need to free up the old recovery area, then allocate a
625           new one at the end of the file. Note that we cannot use
626           tdb_allocate() to allocate the new one as that might return
627           us an area that is being currently used (as of the start of
628           the transaction) */
629        if (recovery_head != 0) {
630                if (tdb_free(tdb, recovery_head, &rec) == -1) {
631                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
632                        return -1;
633                }
634        }
635
636        /* the tdb_free() call might have increased the recovery size */
637        *recovery_size = tdb_recovery_size(tdb);
638
639        /* round up to a multiple of page size */
640        *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
641        *recovery_offset = tdb->map_size;
642        recovery_head = *recovery_offset;
643
644        if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
645                                     (tdb->map_size - tdb->transaction->old_map_size) +
646                                     sizeof(rec) + *recovery_max_size) == -1) {
647                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
648                return -1;
649        }
650
651        /* remap the file (if using mmap) */
652        methods->tdb_oob(tdb, tdb->map_size + 1, 1);
653
654        /* we have to reset the old map size so that we don't try to expand the file
655           again in the transaction commit, which would destroy the recovery area */
656        tdb->transaction->old_map_size = tdb->map_size;
657
658        /* write the recovery header offset and sync - we can sync without a race here
659           as the magic ptr in the recovery record has not been set */
660        CONVERT(recovery_head);
661        if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
662                               &recovery_head, sizeof(tdb_off_t)) == -1) {
663                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
664                return -1;
665        }
666
667        return 0;
668}
669
670
671/*
672  setup the recovery data that will be used on a crash during commit
673*/
674static int transaction_setup_recovery(struct tdb_context *tdb, 
675                                      tdb_off_t *magic_offset)
676{
677        struct tdb_transaction_el *el;
678        tdb_len_t recovery_size;
679        unsigned char *data, *p;
680        const struct tdb_methods *methods = tdb->transaction->io_methods;
681        struct list_struct *rec;
682        tdb_off_t recovery_offset, recovery_max_size;
683        tdb_off_t old_map_size = tdb->transaction->old_map_size;
684        u32 magic, tailer;
685
686        /*
687          check that the recovery area has enough space
688        */
689        if (tdb_recovery_allocate(tdb, &recovery_size, 
690                                  &recovery_offset, &recovery_max_size) == -1) {
691                return -1;
692        }
693
694        data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
695        if (data == NULL) {
696                tdb->ecode = TDB_ERR_OOM;
697                return -1;
698        }
699
700        rec = (struct list_struct *)data;
701        memset(rec, 0, sizeof(*rec));
702
703        rec->magic    = 0;
704        rec->data_len = recovery_size;
705        rec->rec_len  = recovery_max_size;
706        rec->key_len  = old_map_size;
707        CONVERT(rec);
708
709        /* build the recovery data into a single blob to allow us to do a single
710           large write, which should be more efficient */
711        p = data + sizeof(*rec);
712        for (el=tdb->transaction->elements;el;el=el->next) {
713                if (el->offset >= old_map_size) {
714                        continue;
715                }
716                if (el->offset + el->length > tdb->transaction->old_map_size) {
717                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
718                        free(data);
719                        tdb->ecode = TDB_ERR_CORRUPT;
720                        return -1;
721                }
722                memcpy(p, &el->offset, 4);
723                memcpy(p+4, &el->length, 4);
724                if (DOCONV()) {
725                        tdb_convert(p, 8);
726                }
727                /* the recovery area contains the old data, not the
728                   new data, so we have to call the original tdb_read
729                   method to get it */
730                if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
731                        free(data);
732                        tdb->ecode = TDB_ERR_IO;
733                        return -1;
734                }
735                p += 8 + el->length;
736        }
737
738        /* and the tailer */
739        tailer = sizeof(*rec) + recovery_max_size;
740        memcpy(p, &tailer, 4);
741        CONVERT(p);
742
743        /* write the recovery data to the recovery area */
744        if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
745                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
746                free(data);
747                tdb->ecode = TDB_ERR_IO;
748                return -1;
749        }
750
751        /* as we don't have ordered writes, we have to sync the recovery
752           data before we update the magic to indicate that the recovery
753           data is present */
754        if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
755                free(data);
756                return -1;
757        }
758
759        free(data);
760
761        magic = TDB_RECOVERY_MAGIC;
762        CONVERT(magic);
763
764        *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
765
766        if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
767                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
768                tdb->ecode = TDB_ERR_IO;
769                return -1;
770        }
771
772        /* ensure the recovery magic marker is on disk */
773        if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
774                return -1;
775        }
776
777        return 0;
778}
779
780/*
781  commit the current transaction
782*/
783int tdb_transaction_commit(struct tdb_context *tdb)
784{       
785        const struct tdb_methods *methods;
786        tdb_off_t magic_offset = 0;
787        u32 zero = 0;
788
789        if (tdb->transaction == NULL) {
790                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
791                return -1;
792        }
793
794        if (tdb->transaction->transaction_error) {
795                tdb->ecode = TDB_ERR_IO;
796                tdb_transaction_cancel(tdb);
797                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
798                return -1;
799        }
800
801        if (tdb->transaction->nesting != 0) {
802                tdb->transaction->nesting--;
803                return 0;
804        }               
805
806        /* check for a null transaction */
807        if (tdb->transaction->elements == NULL) {
808                tdb_transaction_cancel(tdb);
809                return 0;
810        }
811
812        methods = tdb->transaction->io_methods;
813       
814        /* if there are any locks pending then the caller has not
815           nested their locks properly, so fail the transaction */
816        if (tdb->num_locks || tdb->global_lock.count) {
817                tdb->ecode = TDB_ERR_LOCK;
818                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
819                tdb_transaction_cancel(tdb);
820                return -1;
821        }
822
823        /* upgrade the main transaction lock region to a write lock */
824        if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
825                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
826                tdb->ecode = TDB_ERR_LOCK;
827                tdb_transaction_cancel(tdb);
828                return -1;
829        }
830
831        /* get the global lock - this prevents new users attaching to the database
832           during the commit */
833        if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
834                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
835                tdb->ecode = TDB_ERR_LOCK;
836                tdb_transaction_cancel(tdb);
837                return -1;
838        }
839
840        if (!(tdb->flags & TDB_NOSYNC)) {
841                /* write the recovery data to the end of the file */
842                if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
843                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
844                        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
845                        tdb_transaction_cancel(tdb);
846                        return -1;
847                }
848        }
849
850        /* expand the file to the new size if needed */
851        if (tdb->map_size != tdb->transaction->old_map_size) {
852                if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
853                                             tdb->map_size - 
854                                             tdb->transaction->old_map_size) == -1) {
855                        tdb->ecode = TDB_ERR_IO;
856                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
857                        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
858                        tdb_transaction_cancel(tdb);
859                        return -1;
860                }
861                tdb->map_size = tdb->transaction->old_map_size;
862                methods->tdb_oob(tdb, tdb->map_size + 1, 1);
863        }
864
865        /* perform all the writes */
866        while (tdb->transaction->elements) {
867                struct tdb_transaction_el *el = tdb->transaction->elements;
868
869                if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
870                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
871                       
872                        /* we've overwritten part of the data and
873                           possibly expanded the file, so we need to
874                           run the crash recovery code */
875                        tdb->methods = methods;
876                        tdb_transaction_recover(tdb); 
877
878                        tdb_transaction_cancel(tdb);
879                        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
880
881                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
882                        return -1;
883                }
884                tdb->transaction->elements = el->next;
885                free(el->data); 
886                free(el);
887        } 
888
889        if (!(tdb->flags & TDB_NOSYNC)) {
890                /* ensure the new data is on disk */
891                if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
892                        return -1;
893                }
894
895                /* remove the recovery marker */
896                if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
897                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
898                        return -1;
899                }
900
901                /* ensure the recovery marker has been removed on disk */
902                if (transaction_sync(tdb, magic_offset, 4) == -1) {
903                        return -1;
904                }
905        }
906
907        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
908
909        /*
910          TODO: maybe write to some dummy hdr field, or write to magic
911          offset without mmap, before the last sync, instead of the
912          utime() call
913        */
914
915        /* on some systems (like Linux 2.6.x) changes via mmap/msync
916           don't change the mtime of the file, this means the file may
917           not be backed up (as tdb rounding to block sizes means that
918           file size changes are quite rare too). The following forces
919           mtime changes when a transaction completes */
920#ifdef HAVE_UTIME
921        utime(tdb->name, NULL);
922#endif
923
924        /* use a transaction cancel to free memory and remove the
925           transaction locks */
926        tdb_transaction_cancel(tdb);
927        return 0;
928}
929
930
931/*
932  recover from an aborted transaction. Must be called with exclusive
933  database write access already established (including the global
934  lock to prevent new processes attaching)
935*/
936int tdb_transaction_recover(struct tdb_context *tdb)
937{
938        tdb_off_t recovery_head, recovery_eof;
939        unsigned char *data, *p;
940        u32 zero = 0;
941        struct list_struct rec;
942
943        /* find the recovery area */
944        if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
945                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
946                tdb->ecode = TDB_ERR_IO;
947                return -1;
948        }
949
950        if (recovery_head == 0) {
951                /* we have never allocated a recovery record */
952                return 0;
953        }
954
955        /* read the recovery record */
956        if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
957                                   sizeof(rec), DOCONV()) == -1) {
958                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
959                tdb->ecode = TDB_ERR_IO;
960                return -1;
961        }
962
963        if (rec.magic != TDB_RECOVERY_MAGIC) {
964                /* there is no valid recovery data */
965                return 0;
966        }
967
968        if (tdb->read_only) {
969                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
970                tdb->ecode = TDB_ERR_CORRUPT;
971                return -1;
972        }
973
974        recovery_eof = rec.key_len;
975
976        data = (unsigned char *)malloc(rec.data_len);
977        if (data == NULL) {
978                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
979                tdb->ecode = TDB_ERR_OOM;
980                return -1;
981        }
982
983        /* read the full recovery data */
984        if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
985                                   rec.data_len, 0) == -1) {
986                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
987                tdb->ecode = TDB_ERR_IO;
988                return -1;
989        }
990
991        /* recover the file data */
992        p = data;
993        while (p+8 < data + rec.data_len) {
994                u32 ofs, len;
995                if (DOCONV()) {
996                        tdb_convert(p, 8);
997                }
998                memcpy(&ofs, p, 4);
999                memcpy(&len, p+4, 4);
1000
1001                if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1002                        free(data);
1003                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1004                        tdb->ecode = TDB_ERR_IO;
1005                        return -1;
1006                }
1007                p += 8 + len;
1008        }
1009
1010        free(data);
1011
1012        if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1013                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1014                tdb->ecode = TDB_ERR_IO;
1015                return -1;
1016        }
1017
1018        /* if the recovery area is after the recovered eof then remove it */
1019        if (recovery_eof <= recovery_head) {
1020                if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1021                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1022                        tdb->ecode = TDB_ERR_IO;
1023                        return -1;                     
1024                }
1025        }
1026
1027        /* remove the recovery magic */
1028        if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1029                          &zero) == -1) {
1030                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1031                tdb->ecode = TDB_ERR_IO;
1032                return -1;                     
1033        }
1034       
1035        /* reduce the file size to the old size */
1036        tdb_munmap(tdb);
1037        if (ftruncate(tdb->fd, recovery_eof) != 0) {
1038                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1039                tdb->ecode = TDB_ERR_IO;
1040                return -1;                     
1041        }
1042        tdb->map_size = recovery_eof;
1043        tdb_mmap(tdb);
1044
1045        if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1046                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1047                tdb->ecode = TDB_ERR_IO;
1048                return -1;
1049        }
1050
1051        TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1052                 recovery_eof));
1053
1054        /* all done */
1055        return 0;
1056}
Note: See TracBrowser for help on using the repository browser.