FreeBSD ZFS
The Zettabyte File System
|
00001 /* 00002 * CDDL HEADER START 00003 * 00004 * The contents of this file are subject to the terms of the 00005 * Common Development and Distribution License (the "License"). 00006 * You may not use this file except in compliance with the License. 00007 * 00008 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 00009 * or http://www.opensolaris.org/os/licensing. 00010 * See the License for the specific language governing permissions 00011 * and limitations under the License. 00012 * 00013 * When distributing Covered Code, include this CDDL HEADER in each 00014 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 00015 * If applicable, add the following below this CDDL HEADER, with the 00016 * fields enclosed by brackets "[]" replaced with your own identifying 00017 * information: Portions Copyright [yyyy] [name of copyright owner] 00018 * 00019 * CDDL HEADER END 00020 */ 00021 /* 00022 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 00023 * Portions Copyright 2011 Martin Matuska <mm@FreeBSD.org> 00024 * Copyright (c) 2012 by Delphix. All rights reserved. 00025 */ 00026 00027 #include <sys/zfs_context.h> 00028 #include <sys/txg_impl.h> 00029 #include <sys/dmu_impl.h> 00030 #include <sys/dmu_tx.h> 00031 #include <sys/dsl_pool.h> 00032 #include <sys/dsl_scan.h> 00033 #include <sys/callb.h> 00034 00040 static void txg_sync_thread(void *arg); 00041 static void txg_quiesce_thread(void *arg); 00042 00048 int zfs_txg_timeout = 5; 00049 00050 SYSCTL_DECL(_vfs_zfs); 00051 SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG"); 00052 TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 00053 SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RW, &zfs_txg_timeout, 0, 00054 "Maximum seconds worth of delta per txg"); 00055 00059 void 00060 txg_init(dsl_pool_t *dp, uint64_t txg) 00061 { 00062 tx_state_t *tx = &dp->dp_tx; 00063 int c; 00064 bzero(tx, sizeof (tx_state_t)); 00065 00066 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 00067 00068 for (c = 0; c < max_ncpus; c++) { 00069 int i; 00070 00071 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 00072 for (i = 0; i < TXG_SIZE; i++) { 00073 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 00074 NULL); 00075 list_create(&tx->tx_cpu[c].tc_callbacks[i], 00076 sizeof (dmu_tx_callback_t), 00077 offsetof(dmu_tx_callback_t, dcb_node)); 00078 } 00079 } 00080 00081 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 00082 00083 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 00084 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 00085 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 00086 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 00087 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 00088 00089 tx->tx_open_txg = txg; 00090 } 00091 00095 void 00096 txg_fini(dsl_pool_t *dp) 00097 { 00098 tx_state_t *tx = &dp->dp_tx; 00099 int c; 00100 00101 ASSERT(tx->tx_threads == 0); 00102 00103 mutex_destroy(&tx->tx_sync_lock); 00104 00105 cv_destroy(&tx->tx_sync_more_cv); 00106 cv_destroy(&tx->tx_sync_done_cv); 00107 cv_destroy(&tx->tx_quiesce_more_cv); 00108 cv_destroy(&tx->tx_quiesce_done_cv); 00109 cv_destroy(&tx->tx_exit_cv); 00110 00111 for (c = 0; c < max_ncpus; c++) { 00112 int i; 00113 00114 mutex_destroy(&tx->tx_cpu[c].tc_lock); 00115 for (i = 0; i < TXG_SIZE; i++) { 00116 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 00117 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 00118 } 00119 } 00120 00121 if (tx->tx_commit_cb_taskq != NULL) 00122 taskq_destroy(tx->tx_commit_cb_taskq); 00123 00124 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 00125 00126 bzero(tx, sizeof (tx_state_t)); 00127 } 00128 00132 void 00133 txg_sync_start(dsl_pool_t *dp) 00134 { 00135 tx_state_t *tx = &dp->dp_tx; 00136 00137 mutex_enter(&tx->tx_sync_lock); 00138 00139 dprintf("pool %p\n", dp); 00140 00141 ASSERT(tx->tx_threads == 0); 00142 00143 tx->tx_threads = 2; 00144 00145 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 00146 dp, 0, &p0, TS_RUN, minclsyspri); 00147 00148 /* 00149 * The sync thread can need a larger-than-default stack size on 00150 * 32-bit x86. This is due in part to nested pools and 00151 * scrub_visitbp() recursion. 00152 */ 00153 tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 00154 dp, 0, &p0, TS_RUN, minclsyspri); 00155 00156 mutex_exit(&tx->tx_sync_lock); 00157 } 00158 00159 static void 00160 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 00161 { 00162 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 00163 mutex_enter(&tx->tx_sync_lock); 00164 } 00165 00166 static void 00167 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 00168 { 00169 ASSERT(*tpp != NULL); 00170 *tpp = NULL; 00171 tx->tx_threads--; 00172 cv_broadcast(&tx->tx_exit_cv); 00173 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 00174 thread_exit(); 00175 } 00176 00177 static void 00178 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 00179 { 00180 CALLB_CPR_SAFE_BEGIN(cpr); 00181 00182 if (time) 00183 (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 00184 else 00185 cv_wait(cv, &tx->tx_sync_lock); 00186 00187 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 00188 } 00189 00193 void 00194 txg_sync_stop(dsl_pool_t *dp) 00195 { 00196 tx_state_t *tx = &dp->dp_tx; 00197 00198 dprintf("pool %p\n", dp); 00199 /* 00200 * Finish off any work in progress. 00201 */ 00202 ASSERT(tx->tx_threads == 2); 00203 00204 /* 00205 * We need to ensure that we've vacated the deferred space_maps. 00206 */ 00207 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 00208 00209 /* 00210 * Wake all sync threads and wait for them to die. 00211 */ 00212 mutex_enter(&tx->tx_sync_lock); 00213 00214 ASSERT(tx->tx_threads == 2); 00215 00216 tx->tx_exiting = 1; 00217 00218 cv_broadcast(&tx->tx_quiesce_more_cv); 00219 cv_broadcast(&tx->tx_quiesce_done_cv); 00220 cv_broadcast(&tx->tx_sync_more_cv); 00221 00222 while (tx->tx_threads != 0) 00223 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 00224 00225 tx->tx_exiting = 0; 00226 00227 mutex_exit(&tx->tx_sync_lock); 00228 } 00229 00230 uint64_t 00231 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 00232 { 00233 tx_state_t *tx = &dp->dp_tx; 00234 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 00235 uint64_t txg; 00236 00237 mutex_enter(&tc->tc_lock); 00238 00239 txg = tx->tx_open_txg; 00240 tc->tc_count[txg & TXG_MASK]++; 00241 00242 th->th_cpu = tc; 00243 th->th_txg = txg; 00244 00245 return (txg); 00246 } 00247 00248 void 00249 txg_rele_to_quiesce(txg_handle_t *th) 00250 { 00251 tx_cpu_t *tc = th->th_cpu; 00252 00253 mutex_exit(&tc->tc_lock); 00254 } 00255 00256 void 00257 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 00258 { 00259 tx_cpu_t *tc = th->th_cpu; 00260 int g = th->th_txg & TXG_MASK; 00261 00262 mutex_enter(&tc->tc_lock); 00263 list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 00264 mutex_exit(&tc->tc_lock); 00265 } 00266 00267 void 00268 txg_rele_to_sync(txg_handle_t *th) 00269 { 00270 tx_cpu_t *tc = th->th_cpu; 00271 int g = th->th_txg & TXG_MASK; 00272 00273 mutex_enter(&tc->tc_lock); 00274 ASSERT(tc->tc_count[g] != 0); 00275 if (--tc->tc_count[g] == 0) 00276 cv_broadcast(&tc->tc_cv[g]); 00277 mutex_exit(&tc->tc_lock); 00278 00279 th->th_cpu = NULL; /* defensive */ 00280 } 00281 00288 static void 00289 txg_quiesce(dsl_pool_t *dp, uint64_t txg) 00290 { 00291 tx_state_t *tx = &dp->dp_tx; 00292 int g = txg & TXG_MASK; 00293 int c; 00294 00295 /* 00296 * Grab all tx_cpu locks so nobody else can get into this txg. 00297 */ 00298 for (c = 0; c < max_ncpus; c++) 00299 mutex_enter(&tx->tx_cpu[c].tc_lock); 00300 00301 ASSERT(txg == tx->tx_open_txg); 00302 tx->tx_open_txg++; 00303 00304 /* 00305 * Now that we've incremented tx_open_txg, we can let threads 00306 * enter the next transaction group. 00307 */ 00308 for (c = 0; c < max_ncpus; c++) 00309 mutex_exit(&tx->tx_cpu[c].tc_lock); 00310 00311 /* 00312 * Quiesce the transaction group by waiting for everyone to call 00313 * txg_rele_to_sync(). 00314 */ 00315 for (c = 0; c < max_ncpus; c++) { 00316 tx_cpu_t *tc = &tx->tx_cpu[c]; 00317 mutex_enter(&tc->tc_lock); 00318 while (tc->tc_count[g] != 0) 00319 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 00320 mutex_exit(&tc->tc_lock); 00321 } 00322 } 00323 00324 static void 00325 txg_do_callbacks(void *arg) 00326 { 00327 list_t *cb_list = arg; 00328 00329 dmu_tx_do_callbacks(cb_list, 0); 00330 00331 list_destroy(cb_list); 00332 00333 kmem_free(cb_list, sizeof (list_t)); 00334 } 00335 00342 static void 00343 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 00344 { 00345 int c; 00346 tx_state_t *tx = &dp->dp_tx; 00347 list_t *cb_list; 00348 00349 for (c = 0; c < max_ncpus; c++) { 00350 tx_cpu_t *tc = &tx->tx_cpu[c]; 00351 /* 00352 * No need to lock tx_cpu_t at this point, since this can 00353 * only be called once a txg has been synced. 00354 */ 00355 00356 int g = txg & TXG_MASK; 00357 00358 if (list_is_empty(&tc->tc_callbacks[g])) 00359 continue; 00360 00361 if (tx->tx_commit_cb_taskq == NULL) { 00362 /* 00363 * Commit callback taskq hasn't been created yet. 00364 */ 00365 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 00366 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 00367 TASKQ_PREPOPULATE); 00368 } 00369 00370 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 00371 list_create(cb_list, sizeof (dmu_tx_callback_t), 00372 offsetof(dmu_tx_callback_t, dcb_node)); 00373 00374 list_move_tail(&tc->tc_callbacks[g], cb_list); 00375 00376 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 00377 txg_do_callbacks, cb_list, TQ_SLEEP); 00378 } 00379 } 00380 00381 static void 00382 txg_sync_thread(void *arg) 00383 { 00384 dsl_pool_t *dp = arg; 00385 spa_t *spa = dp->dp_spa; 00386 tx_state_t *tx = &dp->dp_tx; 00387 callb_cpr_t cpr; 00388 uint64_t start, delta; 00389 00390 txg_thread_enter(tx, &cpr); 00391 00392 start = delta = 0; 00393 for (;;) { 00394 uint64_t timer, timeout = zfs_txg_timeout * hz; 00395 uint64_t txg; 00396 00397 /* 00398 * We sync when we're scanning, there's someone waiting 00399 * on us, or the quiesce thread has handed off a txg to 00400 * us, or we have reached our timeout. 00401 */ 00402 timer = (delta >= timeout ? 0 : timeout - delta); 00403 while (!dsl_scan_active(dp->dp_scan) && 00404 !tx->tx_exiting && timer > 0 && 00405 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 00406 tx->tx_quiesced_txg == 0) { 00407 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 00408 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 00409 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 00410 delta = ddi_get_lbolt() - start; 00411 timer = (delta > timeout ? 0 : timeout - delta); 00412 } 00413 00414 /* 00415 * Wait until the quiesce thread hands off a txg to us, 00416 * prompting it to do so if necessary. 00417 */ 00418 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 00419 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 00420 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 00421 cv_broadcast(&tx->tx_quiesce_more_cv); 00422 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 00423 } 00424 00425 if (tx->tx_exiting) 00426 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 00427 00428 /* 00429 * Consume the quiesced txg which has been handed off to 00430 * us. This may cause the quiescing thread to now be 00431 * able to quiesce another txg, so we must signal it. 00432 */ 00433 txg = tx->tx_quiesced_txg; 00434 tx->tx_quiesced_txg = 0; 00435 tx->tx_syncing_txg = txg; 00436 cv_broadcast(&tx->tx_quiesce_more_cv); 00437 00438 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 00439 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 00440 mutex_exit(&tx->tx_sync_lock); 00441 00442 start = ddi_get_lbolt(); 00443 spa_sync(spa, txg); 00444 delta = ddi_get_lbolt() - start; 00445 00446 mutex_enter(&tx->tx_sync_lock); 00447 tx->tx_synced_txg = txg; 00448 tx->tx_syncing_txg = 0; 00449 cv_broadcast(&tx->tx_sync_done_cv); 00450 00451 /* 00452 * Dispatch commit callbacks to worker threads. 00453 */ 00454 txg_dispatch_callbacks(dp, txg); 00455 } 00456 } 00457 00458 static void 00459 txg_quiesce_thread(void *arg) 00460 { 00461 dsl_pool_t *dp = arg; 00462 tx_state_t *tx = &dp->dp_tx; 00463 callb_cpr_t cpr; 00464 00465 txg_thread_enter(tx, &cpr); 00466 00467 for (;;) { 00468 uint64_t txg; 00469 00470 /* 00471 * We quiesce when there's someone waiting on us. 00472 * However, we can only have one txg in "quiescing" or 00473 * "quiesced, waiting to sync" state. So we wait until 00474 * the "quiesced, waiting to sync" txg has been consumed 00475 * by the sync thread. 00476 */ 00477 while (!tx->tx_exiting && 00478 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 00479 tx->tx_quiesced_txg != 0)) 00480 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 00481 00482 if (tx->tx_exiting) 00483 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 00484 00485 txg = tx->tx_open_txg; 00486 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 00487 txg, tx->tx_quiesce_txg_waiting, 00488 tx->tx_sync_txg_waiting); 00489 mutex_exit(&tx->tx_sync_lock); 00490 txg_quiesce(dp, txg); 00491 mutex_enter(&tx->tx_sync_lock); 00492 00493 /* 00494 * Hand this txg off to the sync thread. 00495 */ 00496 dprintf("quiesce done, handing off txg %llu\n", txg); 00497 tx->tx_quiesced_txg = txg; 00498 cv_broadcast(&tx->tx_sync_more_cv); 00499 cv_broadcast(&tx->tx_quiesce_done_cv); 00500 } 00501 } 00502 00508 void 00509 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 00510 { 00511 tx_state_t *tx = &dp->dp_tx; 00512 clock_t timeout = ddi_get_lbolt() + ticks; 00513 00514 /* don't delay if this txg could transition to quiescing immediately */ 00515 if (tx->tx_open_txg > txg || 00516 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 00517 return; 00518 00519 mutex_enter(&tx->tx_sync_lock); 00520 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 00521 mutex_exit(&tx->tx_sync_lock); 00522 return; 00523 } 00524 00525 while (ddi_get_lbolt() < timeout && 00526 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 00527 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 00528 timeout - ddi_get_lbolt()); 00529 00530 mutex_exit(&tx->tx_sync_lock); 00531 } 00532 00533 void 00534 txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 00535 { 00536 tx_state_t *tx = &dp->dp_tx; 00537 00538 mutex_enter(&tx->tx_sync_lock); 00539 ASSERT(tx->tx_threads == 2); 00540 if (txg == 0) 00541 txg = tx->tx_open_txg + TXG_DEFER_SIZE; 00542 if (tx->tx_sync_txg_waiting < txg) 00543 tx->tx_sync_txg_waiting = txg; 00544 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 00545 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 00546 while (tx->tx_synced_txg < txg) { 00547 dprintf("broadcasting sync more " 00548 "tx_synced=%llu waiting=%llu dp=%p\n", 00549 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 00550 cv_broadcast(&tx->tx_sync_more_cv); 00551 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 00552 } 00553 mutex_exit(&tx->tx_sync_lock); 00554 } 00555 00556 void 00557 txg_wait_open(dsl_pool_t *dp, uint64_t txg) 00558 { 00559 tx_state_t *tx = &dp->dp_tx; 00560 00561 mutex_enter(&tx->tx_sync_lock); 00562 ASSERT(tx->tx_threads == 2); 00563 if (txg == 0) 00564 txg = tx->tx_open_txg + 1; 00565 if (tx->tx_quiesce_txg_waiting < txg) 00566 tx->tx_quiesce_txg_waiting = txg; 00567 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 00568 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 00569 while (tx->tx_open_txg < txg) { 00570 cv_broadcast(&tx->tx_quiesce_more_cv); 00571 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 00572 } 00573 mutex_exit(&tx->tx_sync_lock); 00574 } 00575 00576 boolean_t 00577 txg_stalled(dsl_pool_t *dp) 00578 { 00579 tx_state_t *tx = &dp->dp_tx; 00580 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 00581 } 00582 00583 boolean_t 00584 txg_sync_waiting(dsl_pool_t *dp) 00585 { 00586 tx_state_t *tx = &dp->dp_tx; 00587 00588 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 00589 tx->tx_quiesced_txg != 0); 00590 } 00591 00595 void 00596 txg_list_create(txg_list_t *tl, size_t offset) 00597 { 00598 int t; 00599 00600 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 00601 00602 tl->tl_offset = offset; 00603 00604 for (t = 0; t < TXG_SIZE; t++) 00605 tl->tl_head[t] = NULL; 00606 } 00607 00608 void 00609 txg_list_destroy(txg_list_t *tl) 00610 { 00611 int t; 00612 00613 for (t = 0; t < TXG_SIZE; t++) 00614 ASSERT(txg_list_empty(tl, t)); 00615 00616 mutex_destroy(&tl->tl_lock); 00617 } 00618 00619 boolean_t 00620 txg_list_empty(txg_list_t *tl, uint64_t txg) 00621 { 00622 return (tl->tl_head[txg & TXG_MASK] == NULL); 00623 } 00624 00630 int 00631 txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 00632 { 00633 int t = txg & TXG_MASK; 00634 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 00635 int already_on_list; 00636 00637 mutex_enter(&tl->tl_lock); 00638 already_on_list = tn->tn_member[t]; 00639 if (!already_on_list) { 00640 tn->tn_member[t] = 1; 00641 tn->tn_next[t] = tl->tl_head[t]; 00642 tl->tl_head[t] = tn; 00643 } 00644 mutex_exit(&tl->tl_lock); 00645 00646 return (already_on_list); 00647 } 00648 00654 int 00655 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) 00656 { 00657 int t = txg & TXG_MASK; 00658 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 00659 int already_on_list; 00660 00661 mutex_enter(&tl->tl_lock); 00662 already_on_list = tn->tn_member[t]; 00663 if (!already_on_list) { 00664 txg_node_t **tp; 00665 00666 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) 00667 continue; 00668 00669 tn->tn_member[t] = 1; 00670 tn->tn_next[t] = NULL; 00671 *tp = tn; 00672 } 00673 mutex_exit(&tl->tl_lock); 00674 00675 return (already_on_list); 00676 } 00677 00681 void * 00682 txg_list_remove(txg_list_t *tl, uint64_t txg) 00683 { 00684 int t = txg & TXG_MASK; 00685 txg_node_t *tn; 00686 void *p = NULL; 00687 00688 mutex_enter(&tl->tl_lock); 00689 if ((tn = tl->tl_head[t]) != NULL) { 00690 p = (char *)tn - tl->tl_offset; 00691 tl->tl_head[t] = tn->tn_next[t]; 00692 tn->tn_next[t] = NULL; 00693 tn->tn_member[t] = 0; 00694 } 00695 mutex_exit(&tl->tl_lock); 00696 00697 return (p); 00698 } 00699 00703 void * 00704 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 00705 { 00706 int t = txg & TXG_MASK; 00707 txg_node_t *tn, **tp; 00708 00709 mutex_enter(&tl->tl_lock); 00710 00711 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 00712 if ((char *)tn - tl->tl_offset == p) { 00713 *tp = tn->tn_next[t]; 00714 tn->tn_next[t] = NULL; 00715 tn->tn_member[t] = 0; 00716 mutex_exit(&tl->tl_lock); 00717 return (p); 00718 } 00719 } 00720 00721 mutex_exit(&tl->tl_lock); 00722 00723 return (NULL); 00724 } 00725 00726 int 00727 txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 00728 { 00729 int t = txg & TXG_MASK; 00730 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 00731 00732 return (tn->tn_member[t]); 00733 } 00734 00738 void * 00739 txg_list_head(txg_list_t *tl, uint64_t txg) 00740 { 00741 int t = txg & TXG_MASK; 00742 txg_node_t *tn = tl->tl_head[t]; 00743 00744 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 00745 } 00746 00747 void * 00748 txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 00749 { 00750 int t = txg & TXG_MASK; 00751 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 00752 00753 tn = tn->tn_next[t]; 00754 00755 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 00756 }