For memsync replication, hio_countdown is used not only as an indication when a request can be moved to done queue, but also for detecting the current state of memsync request. This approach has problems, e.g. leaking a request if memsynk ack from the secondary failed, or racy usage of write_complete, which should be called only once per write request, but for memsync can be entered by local_send_thread and ggate_send_thread simultaneously. So the following approach is implemented instead: 1) Use hio_countdown only for counting components we waiting to complete, i.e. initially it is always 2 for any replication mode. 2) To distinguish between "memsync ack" and "memsync fin" responses from the secondary, add and use hio_memsyncacked field. 3) write_complete() in component threads is called only before releasing hio_countdown (i.e. before the hio may be returned to the done queue). 4) Add and use hio_writecount refcounter to detect when write_complete() should be called in memsync case. 5) As hio_done is used only for async, rename it to hio_asyncdone and check/modify outside of more generic write_complete(). Reported by: Pete French petefrench ingresso.co.uk Index: sbin/hastd/primary.c =================================================================== --- sbin/hastd/primary.c (revision 258256) +++ sbin/hastd/primary.c (working copy) @@ -90,10 +90,19 @@ struct hio { */ struct g_gate_ctl_io hio_ggio; /* - * Request was already confirmed to GEOM Gate. + * Write request was already confirmed to GEOM Gate. Used for async. */ - bool hio_done; + bool hio_asyncdone; /* + * Number of components we are still waiting before sending write + * completion ack to GEOM Gate. Used for memsync. + */ + refcnt_t hio_writecount; + /* + * Memsync request was acknowleged by remote. + */ + bool hio_memsyncacked; + /* * Remember replication from the time the request was initiated, * so we won't get confused when replication changes on reload. */ @@ -231,6 +240,9 @@ static pthread_mutex_t metadata_lock; #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) +#define ISMEMSYNCWRITE(hio) \ + (((hio)->hio_replication == HAST_REPLICATION_MEMSYNC && \ + (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ((hio)))) static struct hast_resource *gres; @@ -1172,8 +1184,6 @@ write_complete(struct hast_resource *res, struct h struct g_gate_ctl_io *ggio; unsigned int ncomp; - PJDLOG_ASSERT(!hio->hio_done); - ggio = &hio->hio_ggio; PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); @@ -1196,7 +1206,6 @@ write_complete(struct hast_resource *res, struct h rw_unlock(&hio_remote_lock[ncomp]); if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); - hio->hio_done = true; } /* @@ -1227,7 +1236,6 @@ ggate_recv_thread(void *arg) ggio->gctl_unit = res->hr_ggateunit; ggio->gctl_length = MAXPHYS; ggio->gctl_error = 0; - hio->hio_done = false; hio->hio_replication = res->hr_replication; pjdlog_debug(2, "ggate_recv: (%p) Waiting for request from the kernel.", @@ -1344,6 +1352,13 @@ ggate_recv_thread(void *arg) } else { mtx_unlock(&res->hr_amp_lock); } + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) { + hio->hio_memsyncacked = false; + refcnt_init(&hio->hio_writecount, ncomps); + } else if (hio->hio_replication == + HAST_REPLICATION_ASYNC) { + hio->hio_asyncdone = false; + } break; case BIO_DELETE: res->hr_stat_delete++; @@ -1354,13 +1369,7 @@ ggate_recv_thread(void *arg) } pjdlog_debug(2, "ggate_recv: (%p) Moving request to the send queues.", hio); - if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && - ggio->gctl_cmd == BIO_WRITE) { - /* Each remote request needs two responses in memsync. */ - refcnt_init(&hio->hio_countdown, ncomps + 1); - } else { - refcnt_init(&hio->hio_countdown, ncomps); - } + refcnt_init(&hio->hio_countdown, ncomps); for (ii = ncomp; ii < ncomps; ii++) QUEUE_INSERT1(hio, send, ii); } @@ -1435,6 +1444,7 @@ local_send_thread(void *arg) HAST_REPLICATION_ASYNC) { ggio->gctl_error = 0; write_complete(res, hio); + hio->hio_asyncdone = true; } } break; @@ -1470,42 +1480,12 @@ local_send_thread(void *arg) } break; } - - if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || - ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { - if (refcnt_release(&hio->hio_countdown) > 0) - continue; - } else { - /* - * Depending on hio_countdown value, requests finished - * in the following order: - * 0: remote memsync, remote final, local write - * 1: remote memsync, local write, (remote final) - * 2: local write, (remote memsync), (remote final) - */ - switch (refcnt_release(&hio->hio_countdown)) { - case 0: - /* - * Local write finished as last. - */ - break; - case 1: - /* - * Local write finished after remote memsync - * reply arrvied. We can complete the write now. - */ - if (hio->hio_errors[0] == 0) - write_complete(res, hio); - continue; - case 2: - /* - * Local write finished as first. - */ - continue; - default: - PJDLOG_ABORT("Invalid hio_countdown."); - } + if (ISMEMSYNCWRITE(hio)) { + if (refcnt_release(&hio->hio_writecount) == 0) + write_complete(res, hio); } + if (refcnt_release(&hio->hio_countdown) > 0) + continue; if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1627,10 +1607,8 @@ remote_send_thread(void *arg) nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); nv_add_uint64(nv, offset, "offset"); nv_add_uint64(nv, length, "length"); - if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && - ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { + if (ISMEMSYNCWRITE(hio)) nv_add_uint8(nv, 1, "memsync"); - } if (nv_error(nv) != 0) { hio->hio_errors[ncomp] = nv_error(nv); pjdlog_debug(2, @@ -1709,8 +1687,10 @@ done_queue: } else { mtx_unlock(&res->hr_amp_lock); } - if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) - (void)refcnt_release(&hio->hio_countdown); + if (ISMEMSYNCWRITE(hio)) { + if (refcnt_release(&hio->hio_writecount) == 0) + write_complete(res, hio); + } } if (refcnt_release(&hio->hio_countdown) > 0) continue; @@ -1768,6 +1748,7 @@ remote_recv_thread(void *arg) hio_next[ncomp]); hio_recv_list_size[ncomp]--; mtx_unlock(&hio_recv_list_lock[ncomp]); + hio->hio_errors[ncomp] = ENOTCONN; goto done_queue; } if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { @@ -1841,82 +1822,31 @@ remote_recv_thread(void *arg) hio->hio_errors[ncomp] = 0; nv_free(nv); done_queue: - if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || - hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { - if (refcnt_release(&hio->hio_countdown) > 0) - continue; - } else { - /* - * Depending on hio_countdown value, requests finished - * in the following order: - * - * 0: local write, remote memsync, remote final - * or - * 0: remote memsync, local write, remote final - * - * 1: local write, remote memsync, (remote final) - * or - * 1: remote memsync, remote final, (local write) - * - * 2: remote memsync, (local write), (remote final) - * or - * 2: remote memsync, (remote final), (local write) - */ - switch (refcnt_release(&hio->hio_countdown)) { - case 0: - /* - * Remote final reply arrived. - */ - PJDLOG_ASSERT(!memsyncack); - break; - case 1: - if (memsyncack) { - /* - * Local request already finished, so we - * can complete the write. - */ - if (hio->hio_errors[0] == 0) - write_complete(res, hio); - /* - * We still need to wait for final - * remote reply. - */ + if (ISMEMSYNCWRITE(hio)) { + if (!hio->hio_memsyncacked) { + PJDLOG_ASSERT(memsyncack); + /* Remote ack arrived. */ + if (refcnt_release(&hio->hio_writecount) == 0) + write_complete(res, hio); + hio->hio_memsyncacked = true; + if (hio->hio_errors[ncomp] == 0) { pjdlog_debug(2, - "remote_recv: (%p) Moving request back to the recv queue.", - hio); + "remote_recv: (%p) Moving request " + "back to the recv queue.", hio); mtx_lock(&hio_recv_list_lock[ncomp]); TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); hio_recv_list_size[ncomp]++; mtx_unlock(&hio_recv_list_lock[ncomp]); - } else { - /* - * Remote final reply arrived before - * local write finished. - * Nothing to do in such case. - */ + continue; } - continue; - case 2: - /* - * We received remote memsync reply even before - * local write finished. - */ - PJDLOG_ASSERT(memsyncack); - - pjdlog_debug(2, - "remote_recv: (%p) Moving request back to the recv queue.", - hio); - mtx_lock(&hio_recv_list_lock[ncomp]); - TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, - hio_next[ncomp]); - hio_recv_list_size[ncomp]++; - mtx_unlock(&hio_recv_list_lock[ncomp]); - continue; - default: - PJDLOG_ABORT("Invalid hio_countdown."); + } else { + PJDLOG_ASSERT(!memsyncack); + /* Remote final reply arrived. */ } } + if (refcnt_release(&hio->hio_countdown) > 0) + continue; if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1993,8 +1923,11 @@ ggate_send_thread(void *arg) if (range_sync_wait) cv_signal(&range_sync_cond); mtx_unlock(&range_lock); - if (!hio->hio_done) + if (hio->hio_replication == HAST_REPLICATION_FULLSYNC || + (hio->hio_replication == HAST_REPLICATION_ASYNC && + !hio->hio_asyncdone)) { write_complete(res, hio); + } } else { if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { primary_exit(EX_OSERR, @@ -2179,7 +2112,6 @@ sync_thread(void *arg __unused) ggio->gctl_offset = offset; ggio->gctl_length = length; ggio->gctl_error = 0; - hio->hio_done = false; hio->hio_replication = res->hr_replication; for (ii = 0; ii < ncomps; ii++) hio->hio_errors[ii] = EINVAL;