FreeBSD ZFS
The Zettabyte File System

txg.c

Go to the documentation of this file.
00001 /*
00002  * CDDL HEADER START
00003  *
00004  * The contents of this file are subject to the terms of the
00005  * Common Development and Distribution License (the "License").
00006  * You may not use this file except in compliance with the License.
00007  *
00008  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
00009  * or http://www.opensolaris.org/os/licensing.
00010  * See the License for the specific language governing permissions
00011  * and limitations under the License.
00012  *
00013  * When distributing Covered Code, include this CDDL HEADER in each
00014  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
00015  * If applicable, add the following below this CDDL HEADER, with the
00016  * fields enclosed by brackets "[]" replaced with your own identifying
00017  * information: Portions Copyright [yyyy] [name of copyright owner]
00018  *
00019  * CDDL HEADER END
00020  */
00021 /*
00022  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
00023  * Portions Copyright 2011 Martin Matuska <mm@FreeBSD.org>
00024  * Copyright (c) 2012 by Delphix. All rights reserved.
00025  */
00026 
00027 #include <sys/zfs_context.h>
00028 #include <sys/txg_impl.h>
00029 #include <sys/dmu_impl.h>
00030 #include <sys/dmu_tx.h>
00031 #include <sys/dsl_pool.h>
00032 #include <sys/dsl_scan.h>
00033 #include <sys/callb.h>
00034 
00040 static void txg_sync_thread(void *arg);
00041 static void txg_quiesce_thread(void *arg);
00042 
00048 int zfs_txg_timeout = 5;
00049 
00050 SYSCTL_DECL(_vfs_zfs);
00051 SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG");
00052 TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout);
00053 SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RW, &zfs_txg_timeout, 0,
00054     "Maximum seconds worth of delta per txg");
00055 
00059 void
00060 txg_init(dsl_pool_t *dp, uint64_t txg)
00061 {
00062         tx_state_t *tx = &dp->dp_tx;
00063         int c;
00064         bzero(tx, sizeof (tx_state_t));
00065 
00066         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
00067 
00068         for (c = 0; c < max_ncpus; c++) {
00069                 int i;
00070 
00071                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
00072                 for (i = 0; i < TXG_SIZE; i++) {
00073                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
00074                             NULL);
00075                         list_create(&tx->tx_cpu[c].tc_callbacks[i],
00076                             sizeof (dmu_tx_callback_t),
00077                             offsetof(dmu_tx_callback_t, dcb_node));
00078                 }
00079         }
00080 
00081         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
00082 
00083         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
00084         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
00085         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
00086         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
00087         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
00088 
00089         tx->tx_open_txg = txg;
00090 }
00091 
00095 void
00096 txg_fini(dsl_pool_t *dp)
00097 {
00098         tx_state_t *tx = &dp->dp_tx;
00099         int c;
00100 
00101         ASSERT(tx->tx_threads == 0);
00102 
00103         mutex_destroy(&tx->tx_sync_lock);
00104 
00105         cv_destroy(&tx->tx_sync_more_cv);
00106         cv_destroy(&tx->tx_sync_done_cv);
00107         cv_destroy(&tx->tx_quiesce_more_cv);
00108         cv_destroy(&tx->tx_quiesce_done_cv);
00109         cv_destroy(&tx->tx_exit_cv);
00110 
00111         for (c = 0; c < max_ncpus; c++) {
00112                 int i;
00113 
00114                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
00115                 for (i = 0; i < TXG_SIZE; i++) {
00116                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
00117                         list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
00118                 }
00119         }
00120 
00121         if (tx->tx_commit_cb_taskq != NULL)
00122                 taskq_destroy(tx->tx_commit_cb_taskq);
00123 
00124         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
00125 
00126         bzero(tx, sizeof (tx_state_t));
00127 }
00128 
00132 void
00133 txg_sync_start(dsl_pool_t *dp)
00134 {
00135         tx_state_t *tx = &dp->dp_tx;
00136 
00137         mutex_enter(&tx->tx_sync_lock);
00138 
00139         dprintf("pool %p\n", dp);
00140 
00141         ASSERT(tx->tx_threads == 0);
00142 
00143         tx->tx_threads = 2;
00144 
00145         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
00146             dp, 0, &p0, TS_RUN, minclsyspri);
00147 
00148         /*
00149          * The sync thread can need a larger-than-default stack size on
00150          * 32-bit x86.  This is due in part to nested pools and
00151          * scrub_visitbp() recursion.
00152          */
00153         tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
00154             dp, 0, &p0, TS_RUN, minclsyspri);
00155 
00156         mutex_exit(&tx->tx_sync_lock);
00157 }
00158 
00159 static void
00160 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
00161 {
00162         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
00163         mutex_enter(&tx->tx_sync_lock);
00164 }
00165 
00166 static void
00167 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
00168 {
00169         ASSERT(*tpp != NULL);
00170         *tpp = NULL;
00171         tx->tx_threads--;
00172         cv_broadcast(&tx->tx_exit_cv);
00173         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
00174         thread_exit();
00175 }
00176 
00177 static void
00178 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
00179 {
00180         CALLB_CPR_SAFE_BEGIN(cpr);
00181 
00182         if (time)
00183                 (void) cv_timedwait(cv, &tx->tx_sync_lock, time);
00184         else
00185                 cv_wait(cv, &tx->tx_sync_lock);
00186 
00187         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
00188 }
00189 
00193 void
00194 txg_sync_stop(dsl_pool_t *dp)
00195 {
00196         tx_state_t *tx = &dp->dp_tx;
00197 
00198         dprintf("pool %p\n", dp);
00199         /*
00200          * Finish off any work in progress.
00201          */
00202         ASSERT(tx->tx_threads == 2);
00203 
00204         /*
00205          * We need to ensure that we've vacated the deferred space_maps.
00206          */
00207         txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
00208 
00209         /*
00210          * Wake all sync threads and wait for them to die.
00211          */
00212         mutex_enter(&tx->tx_sync_lock);
00213 
00214         ASSERT(tx->tx_threads == 2);
00215 
00216         tx->tx_exiting = 1;
00217 
00218         cv_broadcast(&tx->tx_quiesce_more_cv);
00219         cv_broadcast(&tx->tx_quiesce_done_cv);
00220         cv_broadcast(&tx->tx_sync_more_cv);
00221 
00222         while (tx->tx_threads != 0)
00223                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
00224 
00225         tx->tx_exiting = 0;
00226 
00227         mutex_exit(&tx->tx_sync_lock);
00228 }
00229 
00230 uint64_t
00231 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
00232 {
00233         tx_state_t *tx = &dp->dp_tx;
00234         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
00235         uint64_t txg;
00236 
00237         mutex_enter(&tc->tc_lock);
00238 
00239         txg = tx->tx_open_txg;
00240         tc->tc_count[txg & TXG_MASK]++;
00241 
00242         th->th_cpu = tc;
00243         th->th_txg = txg;
00244 
00245         return (txg);
00246 }
00247 
00248 void
00249 txg_rele_to_quiesce(txg_handle_t *th)
00250 {
00251         tx_cpu_t *tc = th->th_cpu;
00252 
00253         mutex_exit(&tc->tc_lock);
00254 }
00255 
00256 void
00257 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
00258 {
00259         tx_cpu_t *tc = th->th_cpu;
00260         int g = th->th_txg & TXG_MASK;
00261 
00262         mutex_enter(&tc->tc_lock);
00263         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
00264         mutex_exit(&tc->tc_lock);
00265 }
00266 
00267 void
00268 txg_rele_to_sync(txg_handle_t *th)
00269 {
00270         tx_cpu_t *tc = th->th_cpu;
00271         int g = th->th_txg & TXG_MASK;
00272 
00273         mutex_enter(&tc->tc_lock);
00274         ASSERT(tc->tc_count[g] != 0);
00275         if (--tc->tc_count[g] == 0)
00276                 cv_broadcast(&tc->tc_cv[g]);
00277         mutex_exit(&tc->tc_lock);
00278 
00279         th->th_cpu = NULL;      /* defensive */
00280 }
00281 
00288 static void
00289 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
00290 {
00291         tx_state_t *tx = &dp->dp_tx;
00292         int g = txg & TXG_MASK;
00293         int c;
00294 
00295         /*
00296          * Grab all tx_cpu locks so nobody else can get into this txg.
00297          */
00298         for (c = 0; c < max_ncpus; c++)
00299                 mutex_enter(&tx->tx_cpu[c].tc_lock);
00300 
00301         ASSERT(txg == tx->tx_open_txg);
00302         tx->tx_open_txg++;
00303 
00304         /*
00305          * Now that we've incremented tx_open_txg, we can let threads
00306          * enter the next transaction group.
00307          */
00308         for (c = 0; c < max_ncpus; c++)
00309                 mutex_exit(&tx->tx_cpu[c].tc_lock);
00310 
00311         /*
00312          * Quiesce the transaction group by waiting for everyone to call
00313          * txg_rele_to_sync().
00314          */
00315         for (c = 0; c < max_ncpus; c++) {
00316                 tx_cpu_t *tc = &tx->tx_cpu[c];
00317                 mutex_enter(&tc->tc_lock);
00318                 while (tc->tc_count[g] != 0)
00319                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
00320                 mutex_exit(&tc->tc_lock);
00321         }
00322 }
00323 
00324 static void
00325 txg_do_callbacks(void *arg)
00326 {
00327         list_t *cb_list = arg;
00328 
00329         dmu_tx_do_callbacks(cb_list, 0);
00330 
00331         list_destroy(cb_list);
00332 
00333         kmem_free(cb_list, sizeof (list_t));
00334 }
00335 
00342 static void
00343 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
00344 {
00345         int c;
00346         tx_state_t *tx = &dp->dp_tx;
00347         list_t *cb_list;
00348 
00349         for (c = 0; c < max_ncpus; c++) {
00350                 tx_cpu_t *tc = &tx->tx_cpu[c];
00351                 /*
00352                  * No need to lock tx_cpu_t at this point, since this can
00353                  * only be called once a txg has been synced.
00354                  */
00355 
00356                 int g = txg & TXG_MASK;
00357 
00358                 if (list_is_empty(&tc->tc_callbacks[g]))
00359                         continue;
00360 
00361                 if (tx->tx_commit_cb_taskq == NULL) {
00362                         /*
00363                          * Commit callback taskq hasn't been created yet.
00364                          */
00365                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
00366                             max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
00367                             TASKQ_PREPOPULATE);
00368                 }
00369 
00370                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
00371                 list_create(cb_list, sizeof (dmu_tx_callback_t),
00372                     offsetof(dmu_tx_callback_t, dcb_node));
00373 
00374                 list_move_tail(&tc->tc_callbacks[g], cb_list);
00375 
00376                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
00377                     txg_do_callbacks, cb_list, TQ_SLEEP);
00378         }
00379 }
00380 
00381 static void
00382 txg_sync_thread(void *arg)
00383 {
00384         dsl_pool_t *dp = arg;
00385         spa_t *spa = dp->dp_spa;
00386         tx_state_t *tx = &dp->dp_tx;
00387         callb_cpr_t cpr;
00388         uint64_t start, delta;
00389 
00390         txg_thread_enter(tx, &cpr);
00391 
00392         start = delta = 0;
00393         for (;;) {
00394                 uint64_t timer, timeout = zfs_txg_timeout * hz;
00395                 uint64_t txg;
00396 
00397                 /*
00398                  * We sync when we're scanning, there's someone waiting
00399                  * on us, or the quiesce thread has handed off a txg to
00400                  * us, or we have reached our timeout.
00401                  */
00402                 timer = (delta >= timeout ? 0 : timeout - delta);
00403                 while (!dsl_scan_active(dp->dp_scan) &&
00404                     !tx->tx_exiting && timer > 0 &&
00405                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
00406                     tx->tx_quiesced_txg == 0) {
00407                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
00408                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
00409                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
00410                         delta = ddi_get_lbolt() - start;
00411                         timer = (delta > timeout ? 0 : timeout - delta);
00412                 }
00413 
00414                 /*
00415                  * Wait until the quiesce thread hands off a txg to us,
00416                  * prompting it to do so if necessary.
00417                  */
00418                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
00419                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
00420                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
00421                         cv_broadcast(&tx->tx_quiesce_more_cv);
00422                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
00423                 }
00424 
00425                 if (tx->tx_exiting)
00426                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
00427 
00428                 /*
00429                  * Consume the quiesced txg which has been handed off to
00430                  * us.  This may cause the quiescing thread to now be
00431                  * able to quiesce another txg, so we must signal it.
00432                  */
00433                 txg = tx->tx_quiesced_txg;
00434                 tx->tx_quiesced_txg = 0;
00435                 tx->tx_syncing_txg = txg;
00436                 cv_broadcast(&tx->tx_quiesce_more_cv);
00437 
00438                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
00439                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
00440                 mutex_exit(&tx->tx_sync_lock);
00441 
00442                 start = ddi_get_lbolt();
00443                 spa_sync(spa, txg);
00444                 delta = ddi_get_lbolt() - start;
00445 
00446                 mutex_enter(&tx->tx_sync_lock);
00447                 tx->tx_synced_txg = txg;
00448                 tx->tx_syncing_txg = 0;
00449                 cv_broadcast(&tx->tx_sync_done_cv);
00450 
00451                 /*
00452                  * Dispatch commit callbacks to worker threads.
00453                  */
00454                 txg_dispatch_callbacks(dp, txg);
00455         }
00456 }
00457 
00458 static void
00459 txg_quiesce_thread(void *arg)
00460 {
00461         dsl_pool_t *dp = arg;
00462         tx_state_t *tx = &dp->dp_tx;
00463         callb_cpr_t cpr;
00464 
00465         txg_thread_enter(tx, &cpr);
00466 
00467         for (;;) {
00468                 uint64_t txg;
00469 
00470                 /*
00471                  * We quiesce when there's someone waiting on us.
00472                  * However, we can only have one txg in "quiescing" or
00473                  * "quiesced, waiting to sync" state.  So we wait until
00474                  * the "quiesced, waiting to sync" txg has been consumed
00475                  * by the sync thread.
00476                  */
00477                 while (!tx->tx_exiting &&
00478                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
00479                     tx->tx_quiesced_txg != 0))
00480                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
00481 
00482                 if (tx->tx_exiting)
00483                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
00484 
00485                 txg = tx->tx_open_txg;
00486                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
00487                     txg, tx->tx_quiesce_txg_waiting,
00488                     tx->tx_sync_txg_waiting);
00489                 mutex_exit(&tx->tx_sync_lock);
00490                 txg_quiesce(dp, txg);
00491                 mutex_enter(&tx->tx_sync_lock);
00492 
00493                 /*
00494                  * Hand this txg off to the sync thread.
00495                  */
00496                 dprintf("quiesce done, handing off txg %llu\n", txg);
00497                 tx->tx_quiesced_txg = txg;
00498                 cv_broadcast(&tx->tx_sync_more_cv);
00499                 cv_broadcast(&tx->tx_quiesce_done_cv);
00500         }
00501 }
00502 
00508 void
00509 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
00510 {
00511         tx_state_t *tx = &dp->dp_tx;
00512         clock_t timeout = ddi_get_lbolt() + ticks;
00513 
00514         /* don't delay if this txg could transition to quiescing immediately */
00515         if (tx->tx_open_txg > txg ||
00516             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
00517                 return;
00518 
00519         mutex_enter(&tx->tx_sync_lock);
00520         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
00521                 mutex_exit(&tx->tx_sync_lock);
00522                 return;
00523         }
00524 
00525         while (ddi_get_lbolt() < timeout &&
00526             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
00527                 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
00528                     timeout - ddi_get_lbolt());
00529 
00530         mutex_exit(&tx->tx_sync_lock);
00531 }
00532 
00533 void
00534 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
00535 {
00536         tx_state_t *tx = &dp->dp_tx;
00537 
00538         mutex_enter(&tx->tx_sync_lock);
00539         ASSERT(tx->tx_threads == 2);
00540         if (txg == 0)
00541                 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
00542         if (tx->tx_sync_txg_waiting < txg)
00543                 tx->tx_sync_txg_waiting = txg;
00544         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
00545             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
00546         while (tx->tx_synced_txg < txg) {
00547                 dprintf("broadcasting sync more "
00548                     "tx_synced=%llu waiting=%llu dp=%p\n",
00549                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
00550                 cv_broadcast(&tx->tx_sync_more_cv);
00551                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
00552         }
00553         mutex_exit(&tx->tx_sync_lock);
00554 }
00555 
00556 void
00557 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
00558 {
00559         tx_state_t *tx = &dp->dp_tx;
00560 
00561         mutex_enter(&tx->tx_sync_lock);
00562         ASSERT(tx->tx_threads == 2);
00563         if (txg == 0)
00564                 txg = tx->tx_open_txg + 1;
00565         if (tx->tx_quiesce_txg_waiting < txg)
00566                 tx->tx_quiesce_txg_waiting = txg;
00567         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
00568             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
00569         while (tx->tx_open_txg < txg) {
00570                 cv_broadcast(&tx->tx_quiesce_more_cv);
00571                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
00572         }
00573         mutex_exit(&tx->tx_sync_lock);
00574 }
00575 
00576 boolean_t
00577 txg_stalled(dsl_pool_t *dp)
00578 {
00579         tx_state_t *tx = &dp->dp_tx;
00580         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
00581 }
00582 
00583 boolean_t
00584 txg_sync_waiting(dsl_pool_t *dp)
00585 {
00586         tx_state_t *tx = &dp->dp_tx;
00587 
00588         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
00589             tx->tx_quiesced_txg != 0);
00590 }
00591 
00595 void
00596 txg_list_create(txg_list_t *tl, size_t offset)
00597 {
00598         int t;
00599 
00600         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
00601 
00602         tl->tl_offset = offset;
00603 
00604         for (t = 0; t < TXG_SIZE; t++)
00605                 tl->tl_head[t] = NULL;
00606 }
00607 
00608 void
00609 txg_list_destroy(txg_list_t *tl)
00610 {
00611         int t;
00612 
00613         for (t = 0; t < TXG_SIZE; t++)
00614                 ASSERT(txg_list_empty(tl, t));
00615 
00616         mutex_destroy(&tl->tl_lock);
00617 }
00618 
00619 boolean_t
00620 txg_list_empty(txg_list_t *tl, uint64_t txg)
00621 {
00622         return (tl->tl_head[txg & TXG_MASK] == NULL);
00623 }
00624 
00630 int
00631 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
00632 {
00633         int t = txg & TXG_MASK;
00634         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
00635         int already_on_list;
00636 
00637         mutex_enter(&tl->tl_lock);
00638         already_on_list = tn->tn_member[t];
00639         if (!already_on_list) {
00640                 tn->tn_member[t] = 1;
00641                 tn->tn_next[t] = tl->tl_head[t];
00642                 tl->tl_head[t] = tn;
00643         }
00644         mutex_exit(&tl->tl_lock);
00645 
00646         return (already_on_list);
00647 }
00648 
00654 int
00655 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
00656 {
00657         int t = txg & TXG_MASK;
00658         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
00659         int already_on_list;
00660 
00661         mutex_enter(&tl->tl_lock);
00662         already_on_list = tn->tn_member[t];
00663         if (!already_on_list) {
00664                 txg_node_t **tp;
00665 
00666                 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
00667                         continue;
00668 
00669                 tn->tn_member[t] = 1;
00670                 tn->tn_next[t] = NULL;
00671                 *tp = tn;
00672         }
00673         mutex_exit(&tl->tl_lock);
00674 
00675         return (already_on_list);
00676 }
00677 
00681 void *
00682 txg_list_remove(txg_list_t *tl, uint64_t txg)
00683 {
00684         int t = txg & TXG_MASK;
00685         txg_node_t *tn;
00686         void *p = NULL;
00687 
00688         mutex_enter(&tl->tl_lock);
00689         if ((tn = tl->tl_head[t]) != NULL) {
00690                 p = (char *)tn - tl->tl_offset;
00691                 tl->tl_head[t] = tn->tn_next[t];
00692                 tn->tn_next[t] = NULL;
00693                 tn->tn_member[t] = 0;
00694         }
00695         mutex_exit(&tl->tl_lock);
00696 
00697         return (p);
00698 }
00699 
00703 void *
00704 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
00705 {
00706         int t = txg & TXG_MASK;
00707         txg_node_t *tn, **tp;
00708 
00709         mutex_enter(&tl->tl_lock);
00710 
00711         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
00712                 if ((char *)tn - tl->tl_offset == p) {
00713                         *tp = tn->tn_next[t];
00714                         tn->tn_next[t] = NULL;
00715                         tn->tn_member[t] = 0;
00716                         mutex_exit(&tl->tl_lock);
00717                         return (p);
00718                 }
00719         }
00720 
00721         mutex_exit(&tl->tl_lock);
00722 
00723         return (NULL);
00724 }
00725 
00726 int
00727 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
00728 {
00729         int t = txg & TXG_MASK;
00730         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
00731 
00732         return (tn->tn_member[t]);
00733 }
00734 
00738 void *
00739 txg_list_head(txg_list_t *tl, uint64_t txg)
00740 {
00741         int t = txg & TXG_MASK;
00742         txg_node_t *tn = tl->tl_head[t];
00743 
00744         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
00745 }
00746 
00747 void *
00748 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
00749 {
00750         int t = txg & TXG_MASK;
00751         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
00752 
00753         tn = tn->tn_next[t];
00754 
00755         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
00756 }
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Defines