Index: sbin/hastd/parse.y =================================================================== --- sbin/hastd/parse.y (revision 223532) +++ sbin/hastd/parse.y (working copy) @@ -293,11 +293,9 @@ yy_config_parse(const char *config, bool exitonerr */ curres->hr_replication = depth0_replication; } - if (curres->hr_replication == HAST_REPLICATION_MEMSYNC || - curres->hr_replication == HAST_REPLICATION_ASYNC) { - pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".", - curres->hr_replication == HAST_REPLICATION_MEMSYNC ? - "memsync" : "async", "fullsync"); + if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) { + pjdlog_warning("Replication mode \"memsync\" is not implemented, falling back to \"%s\".", + "fullsync"); curres->hr_replication = HAST_REPLICATION_FULLSYNC; } if (curres->hr_checksum == -1) { Index: sbin/hastd/activemap.c =================================================================== --- sbin/hastd/activemap.c (revision 223532) +++ sbin/hastd/activemap.c (working copy) @@ -671,7 +671,30 @@ activemap_need_sync(struct activemap *amp, off_t o return (modified); } +/* + * Mark dirty extents for synchronization. + */ void +activemap_mark_sync(struct activemap *amp) +{ + int ext; + + assert(amp->am_magic == ACTIVEMAP_MAGIC); + + bit_ffs(amp->am_memmap, amp->am_nextents, &ext); + if (ext == -1) { + /* There are no dirty extents, so we can leave now. */ + return; + } + for (; ext < amp->am_nextents; ext++) { + if (bit_test(amp->am_memmap, ext)) { + bit_set(amp->am_syncmap, ext); + amp->am_memtab[ext] = ext2reqs(amp, ext); + } + } +} + +void activemap_dump(const struct activemap *amp) { int bit; Index: sbin/hastd/activemap.h =================================================================== --- sbin/hastd/activemap.h (revision 223532) +++ sbin/hastd/activemap.h (working copy) @@ -63,6 +63,7 @@ void activemap_sync_rewind(struct activemap *amp); off_t activemap_sync_offset(struct activemap *amp, off_t *lengthp, int *syncextp); bool activemap_need_sync(struct activemap *amp, off_t offset, off_t length); +void activemap_mark_sync(struct activemap *amp); void activemap_dump(const struct activemap *amp); Index: sbin/hastd/primary.c =================================================================== --- sbin/hastd/primary.c (revision 223532) +++ sbin/hastd/primary.c (working copy) @@ -1202,12 +1202,22 @@ ggate_recv_thread(void *arg) res->hr_stat_flush++; break; } - pjdlog_debug(2, - "ggate_recv: (%p) Moving request to the send queues.", - hio); - refcount_init(&hio->hio_countdown, ncomps); - for (ii = 0; ii < ncomps; ii++) - QUEUE_INSERT1(hio, send, ii); + if (res->hr_replication != HAST_REPLICATION_ASYNC) { + pjdlog_debug(2, + "ggate_recv: (%p) Moving request to the send queues.", + hio); + refcount_init(&hio->hio_countdown, ncomps); + for (ii = 0; ii < ncomps; ii++) + QUEUE_INSERT1(hio, send, ii); + } else { + pjdlog_debug(2, + "ggate_recv: (%p) Moving request to the local send queue.", + hio); + refcount_init(&hio->hio_countdown, 1); + /* Local component is 0 for now. */ + ncomp = 0; + QUEUE_INSERT1(hio, send, ncomp); + } break; } } @@ -1685,7 +1695,8 @@ ggate_send_thread(void *arg) else ggio->gctl_error = hio->hio_errors[0]; } - if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { + if (res->hr_replication != HAST_REPLICATION_ASYNC && + ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { mtx_lock(&res->hr_amp_lock); activemap_write_complete(res->hr_amp, ggio->gctl_offset, ggio->gctl_length); @@ -2124,6 +2135,30 @@ guard_one(struct hast_resource *res, unsigned int rw_unlock(&hio_remote_lock[ncomp]); pjdlog_debug(2, "remote_guard: Connection to %s is ok.", res->hr_remoteaddr); + if (res->hr_syncsrc != HAST_SYNCSRC_SECONDARY && + res->hr_replication == HAST_REPLICATION_ASYNC) { + /* + * Bump local count and start synchronization. + */ + ncomp = 1; + mtx_lock(&metadata_lock); + if (activemap_ndirty(res->hr_amp) > 0) { + activemap_mark_sync(res->hr_amp); + if (res->hr_primary_localcnt == + res->hr_secondary_remotecnt) { + res->hr_primary_localcnt++; + pjdlog_debug(1, + "Increasing localcnt to %ju.", + (uintmax_t)res->hr_primary_localcnt); + (void)metadata_write(res); + } + mtx_unlock(&metadata_lock); + res->hr_syncsrc = HAST_SYNCSRC_PRIMARY; + sync_start(); + } else { + mtx_unlock(&metadata_lock); + } + } return; }