Index: sbin/hastd/hastd.c =================================================================== --- sbin/hastd/hastd.c (wersja 238109) +++ sbin/hastd/hastd.c (kopia robocza) @@ -68,7 +68,7 @@ bool sigexit_received = false; /* Path to pidfile. */ static const char *pidfile; -/* PID file handle. */ +/* Pidfile handle. */ struct pidfh *pfh; /* Do we run in foreground? */ static bool foreground; @@ -748,6 +748,7 @@ const char *resname; const unsigned char *token; char laddr[256], raddr[256]; + uint8_t version; size_t size; pid_t pid; int status; @@ -797,6 +798,19 @@ goto close; } pjdlog_debug(2, "%s: resource=%s", raddr, resname); + version = nv_get_uint8(nvin, "version"); + pjdlog_debug(2, "%s: version=%hhu", raddr, version); + if (version == 0) { + /* + * If no version is sent, it means this is protocol version 1. + */ + version = 1; + } + if (version > HAST_PROTO_VERSION) { + pjdlog_info("Remote protocol version %hhu is not supported, falling back to version %hhu.", + version, (unsigned char)HAST_PROTO_VERSION); + version = HAST_PROTO_VERSION; + } token = nv_get_uint8_array(nvin, &size, "token"); /* * NULL token means that this is first connection. @@ -910,8 +924,10 @@ */ if (token == NULL) { + res->hr_version = version; arc4random_buf(res->hr_token, sizeof(res->hr_token)); nvout = nv_alloc(); + nv_add_uint8(nvout, version, "version"); nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), "token"); if (nv_error(nvout) != 0) { Index: sbin/hastd/secondary.c =================================================================== --- sbin/hastd/secondary.c (wersja 238109) +++ sbin/hastd/secondary.c (kopia robocza) @@ -71,6 +71,7 @@ uint8_t hio_cmd; uint64_t hio_offset; uint64_t hio_length; + bool hio_memsync; TAILQ_ENTRY(hio) hio_next; }; @@ -135,9 +136,25 @@ hio->hio_cmd = HIO_UNDEF; hio->hio_offset = 0; hio->hio_length = 0; + hio->hio_memsync = false; } static void +hio_copy(const struct hio *srchio, struct hio *dsthio) +{ + + /* + * We don't copy hio_error, hio_data and hio_next fields. + */ + + dsthio->hio_seq = srchio->hio_seq; + dsthio->hio_cmd = srchio->hio_cmd; + dsthio->hio_offset = srchio->hio_offset; + dsthio->hio_length = srchio->hio_length; + dsthio->hio_memsync = srchio->hio_memsync; +} + +static void init_environment(void) { struct hio *hio; @@ -543,8 +560,10 @@ case HIO_FLUSH: case HIO_KEEPALIVE: break; + case HIO_WRITE: + hio->hio_memsync = nv_exists(nv, "memsync"); + /* FALLTHROUGH */ case HIO_READ: - case HIO_WRITE: case HIO_DELETE: hio->hio_offset = nv_get_uint64(nv, "offset"); if (nv_error(nv) != 0) { @@ -621,7 +640,7 @@ recv_thread(void *arg) { struct hast_resource *res = arg; - struct hio *hio; + struct hio *hio, *mshio; struct nv *nv; for (;;) { @@ -675,6 +694,27 @@ secondary_exit(EX_TEMPFAIL, "Unable to receive request data"); } + if (hio->hio_memsync) { + /* + * For memsync requests we expect two replies. + * Clone the hio so we can handle both of them. + */ + pjdlog_debug(2, "recv: Taking free request."); + QUEUE_TAKE(free, mshio); + pjdlog_debug(2, "recv: (%p) Got request.", + mshio); + hio_copy(hio, mshio); + mshio->hio_error = 0; + /* + * We want to keep 'memsync' tag only on the + * request going onto send queue (mshio). + */ + hio->hio_memsync = false; + pjdlog_debug(2, + "recv: (%p) Moving memsync request to the send queue.", + mshio); + QUEUE_INSERT(send, mshio); + } } nv_free(nv); pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", @@ -818,6 +858,10 @@ nvout = nv_alloc(); /* Copy sequence number. */ nv_add_uint64(nvout, hio->hio_seq, "seq"); + if (hio->hio_memsync) { + PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE); + nv_add_int8(nvout, 1, "received"); + } switch (hio->hio_cmd) { case HIO_READ: if (hio->hio_error == 0) { Index: sbin/hastd/primary.c =================================================================== --- sbin/hastd/primary.c (wersja 238120) +++ sbin/hastd/primary.c (kopia robocza) @@ -35,7 +35,6 @@ #include #include #include -#include #include #include @@ -65,6 +64,7 @@ #include "metadata.h" #include "proto.h" #include "pjdlog.h" +#include "refcnt.h" #include "subr.h" #include "synch.h" @@ -543,7 +543,7 @@ return (0); } - + /* * Function instructs GEOM_GATE to handle reads directly from within the kernel. */ @@ -577,6 +577,7 @@ int32_t extentsize; int64_t datasize; uint32_t mapsize; + uint8_t version; size_t size; int error; @@ -597,6 +598,7 @@ */ nvout = nv_alloc(); nv_add_string(nvout, res->hr_name, "resource"); + nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); if (nv_error(nvout) != 0) { pjdlog_common(LOG_WARNING, 0, nv_error(nvout), "Unable to allocate header for connection with %s", @@ -626,6 +628,19 @@ nv_free(nvin); goto close; } + version = nv_get_uint8(nvin, "version"); + if (version == 0) { + /* + * If no version is sent, it means this is protocol version 1. + */ + version = 1; + } + if (version > HAST_PROTO_VERSION) { + pjdlog_warning("Invalid version received (%hhu).", version); + nv_free(nvin); + goto close; + } + res->hr_version = version; token = nv_get_uint8_array(nvin, &size, "token"); if (token == NULL) { pjdlog_warning("Handshake header from %s has no 'token' field.", @@ -776,6 +791,16 @@ pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); #endif pjdlog_info("Connected to %s.", res->hr_remoteaddr); + if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && + res->hr_version < 2) { + pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); + res->hr_replication = HAST_REPLICATION_FULLSYNC; + } else if (res->hr_replication != res->hr_original_replication) { + /* + * This is in case hastd disconnected and was upgraded. + */ + res->hr_replication = res->hr_original_replication; + } if (inp != NULL && outp != NULL) { *inp = in; *outp = out; @@ -1009,7 +1034,8 @@ } static void -reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) +reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, + const char *fmt, ...) { char msg[1024]; va_list ap; @@ -1020,21 +1046,18 @@ switch (ggio->gctl_cmd) { case BIO_READ: (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", - (uintmax_t)ggio->gctl_offset, - (uintmax_t)ggio->gctl_length); + (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); break; case BIO_DELETE: (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", - (uintmax_t)ggio->gctl_offset, - (uintmax_t)ggio->gctl_length); + (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); break; case BIO_FLUSH: (void)snprlcat(msg, sizeof(msg), "FLUSH."); break; case BIO_WRITE: (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", - (uintmax_t)ggio->gctl_offset, - (uintmax_t)ggio->gctl_length); + (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); break; default: (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", @@ -1274,7 +1297,12 @@ } pjdlog_debug(2, "ggate_recv: (%p) Moving request to the send queues.", hio); - refcount_init(&hio->hio_countdown, ncomps); + hio->hio_countdown = ncomps; + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && + ggio->gctl_cmd == BIO_WRITE) { + /* Each remote request needs two responses in memsync. */ + hio->hio_countdown++; + } for (ii = ncomp; ii < ncomp + ncomps; ii++) QUEUE_INSERT1(hio, send, ii); } @@ -1385,8 +1413,41 @@ } break; } - if (!refcount_release(&hio->hio_countdown)) - continue; + + if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || + ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { + if (refcnt_release(&hio->hio_countdown) > 0) + continue; + } else { + /* + * Depending on hio_countdown value, requests finished + * in the following order: + * 0: remote memsync, remote final, local write + * 1: remote memsync, local write, (remote final) + * 2: local write, (remote memsync), (remote final) + */ + switch (refcnt_release(&hio->hio_countdown)) { + case 0: + /* + * Local write finished as last. + */ + break; + case 1: + /* + * Local write finished after remote memsync + * reply arrvied. We can complete the write now. + */ + if (hio->hio_errors[0] == 0) + write_complete(res, hio); + continue; + case 2: + /* + * Local write finished as first. + */ + continue; + default: + PJDLOG_ABORT("Invalid hio_countdown."); + } if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1508,6 +1569,10 @@ nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); nv_add_uint64(nv, offset, "offset"); nv_add_uint64(nv, length, "length"); + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && + ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { + nv_add_uint8(nv, 1, "memsync"); + } if (nv_error(nv) != 0) { hio->hio_errors[ncomp] = nv_error(nv); pjdlog_debug(2, @@ -1568,7 +1633,7 @@ done_queue: nv_free(nv); if (ISSYNCREQ(hio)) { - if (!refcount_release(&hio->hio_countdown)) + if (refcnt_release(&hio->hio_countdown) > 0) continue; mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1583,8 +1648,10 @@ (void)hast_activemap_flush(res); } mtx_unlock(&res->hr_amp_lock); + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) + (void)refcnt_release(&hio->hio_countdown); } - if (!refcount_release(&hio->hio_countdown)) + if (refcnt_release(&hio->hio_countdown) > 0) continue; pjdlog_debug(2, "remote_send: (%p) Moving request to the done queue.", @@ -1608,6 +1675,7 @@ struct nv *nv; unsigned int ncomp; uint64_t seq; + bool memsyncack; int error; /* Remote component is 1 for now. */ @@ -1623,6 +1691,8 @@ } mtx_unlock(&hio_recv_list_lock[ncomp]); + memsyncack = false; + rw_rlock(&hio_remote_lock[ncomp]); if (!ISCONNECTED(res, ncomp)) { rw_unlock(&hio_remote_lock[ncomp]); @@ -1652,6 +1722,7 @@ nv_free(nv); continue; } + memsyncack = nv_exists(nv, "received"); mtx_lock(&hio_recv_list_lock[ncomp]); TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { if (hio->hio_ggio.gctl_seq == seq) { @@ -1707,8 +1778,80 @@ hio->hio_errors[ncomp] = 0; nv_free(nv); done_queue: - if (!refcount_release(&hio->hio_countdown)) - continue; + if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || + hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { + if (refcnt_release(&hio->hio_countdown) > 0) + continue; + } else { + /* + * Depending on hio_countdown value, requests finished + * in the following order: + * + * 0: local write, remote memsync, remote final + * or + * 0: remote memsync, local write, remote final + * + * 1: local write, remote memsync, (remote final) + * or + * 1: remote memsync, remote final, (local write) + * + * 2: remote memsync, (local write), (remote final) + * or + * 2: remote memsync, (remote final), (local write) + */ + switch (refcnt_release(&hio->hio_countdown)) { + case 0: + /* + * Remote final reply arrived. + */ + PJDLOG_ASSERT(!memsyncack); + break; + case 1: + if (memsyncack) { + /* + * Local request already finished, so we + * can complete the write. + */ + if (hio->hio_errors[0] == 0) + write_complete(res, hio); + /* + * We still need to wait for final + * remote reply. + */ + pjdlog_debug(2, + "remote_recv: (%p) Moving request back to the recv queue.", + hio); + mtx_lock(&hio_recv_list_lock[ncomp]); + TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], + hio, hio_next[ncomp]); + mtx_unlock(&hio_recv_list_lock[ncomp]); + } else { + /* + * Remote final reply arrived before + * local write finished. + * Nothing to do in such case. + */ + } + continue; + case 2: + /* + * We received remote memsync reply even before + * local write finished. + */ + PJDLOG_ASSERT(memsyncack); + + pjdlog_debug(2, + "remote_recv: (%p) Moving request back to the recv queue.", + hio); + mtx_lock(&hio_recv_list_lock[ncomp]); + TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, + hio_next[ncomp]); + mtx_unlock(&hio_recv_list_lock[ncomp]); + continue; + default: + PJDLOG_ABORT("Invalid hio_countdown."); + } + } if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1977,7 +2120,7 @@ ncomp = 1; } mtx_unlock(&metadata_lock); - refcount_init(&hio->hio_countdown, 1); + hio->hio_countdown = 1; QUEUE_INSERT1(hio, send, ncomp); /* @@ -2027,7 +2170,7 @@ pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", hio); - refcount_init(&hio->hio_countdown, 1); + hio->hio_countdown = 1; QUEUE_INSERT1(hio, send, ncomp); /* Index: sbin/hastd/parse.y =================================================================== --- sbin/hastd/parse.y (wersja 238109) +++ sbin/hastd/parse.y (kopia robocza) @@ -236,6 +236,7 @@ case 1: PJDLOG_ASSERT(curres != NULL); curres->hr_replication = $2; + curres->hr_original_replication = $2; break; default: PJDLOG_ABORT("replication at wrong depth level"); @@ -533,6 +534,7 @@ curres->hr_role = HAST_ROLE_INIT; curres->hr_previous_role = HAST_ROLE_INIT; curres->hr_replication = -1; + curres->hr_original_replication = -1; curres->hr_checksum = -1; curres->hr_compression = -1; curres->hr_timeout = -1; @@ -724,6 +726,7 @@ isitme(const char *name) { char buf[MAXHOSTNAMELEN]; + unsigned long hostid; char *pos; size_t bufsize; @@ -738,7 +741,7 @@ return (1); /* - * Now check if it matches first part of the host name. + * Check if it matches first part of the host name. */ pos = strchr(buf, '.'); if (pos != NULL && (size_t)(pos - buf) == strlen(name) && @@ -747,7 +750,7 @@ } /* - * At the end check if name is equal to our host's UUID. + * Check if it matches host UUID. */ bufsize = sizeof(buf); if (sysctlbyname("kern.hostuuid", buf, &bufsize, NULL, 0) < 0) { @@ -758,6 +761,18 @@ return (1); /* + * Check if it matches hostid. + */ + bufsize = sizeof(hostid); + if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) { + pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed"); + return (-1); + } + (void)snprintf(buf, sizeof(buf), "hostid%lu", hostid); + if (strcmp(buf, name) == 0) + return (1); + + /* * Looks like this isn't about us. */ return (0); @@ -781,6 +796,7 @@ { static char names[MAXHOSTNAMELEN * 3]; char buf[MAXHOSTNAMELEN]; + unsigned long hostid; char *pos; size_t bufsize; @@ -808,7 +824,17 @@ return (-1); } (void)strlcat(names, buf, sizeof(names)); + (void)strlcat(names, ", ", sizeof(names)); + /* Host ID. */ + bufsize = sizeof(hostid); + if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) { + pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed"); + return (-1); + } + (void)snprintf(buf, sizeof(buf), "hostid%lu", hostid); + (void)strlcat(names, buf, sizeof(names)); + *namesp = names; return (0); @@ -833,7 +859,7 @@ lineno = 0; depth0_timeout = HAST_TIMEOUT; - depth0_replication = HAST_REPLICATION_FULLSYNC; + depth0_replication = HAST_REPLICATION_MEMSYNC; depth0_checksum = HAST_CHECKSUM_NONE; depth0_compression = HAST_COMPRESSION_HOLE; strlcpy(depth0_control, HAST_CONTROL, sizeof(depth0_control)); @@ -943,12 +969,8 @@ * Use global or default setting. */ curres->hr_replication = depth0_replication; + curres->hr_original_replication = depth0_replication; } - if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) { - pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".", - "memsync", "fullsync"); - curres->hr_replication = HAST_REPLICATION_FULLSYNC; - } if (curres->hr_checksum == -1) { /* * Checksum is not set at resource-level. Index: sbin/hastd/hast.h =================================================================== --- sbin/hastd/hast.h (wersja 238109) +++ sbin/hastd/hast.h (kopia robocza) @@ -53,8 +53,9 @@ * Version history: * 0 - initial version * 1 - HIO_KEEPALIVE added + * 2 - "memsync" and "received" attributes added for memsync mode */ -#define HAST_PROTO_VERSION 1 +#define HAST_PROTO_VERSION 2 #define EHAST_OK 0 #define EHAST_NOENTRY 1 @@ -142,8 +143,10 @@ struct hast_resource { /* Resource name. */ char hr_name[NAME_MAX]; - /* Replication mode (HAST_REPLICATION_*). */ + /* Negotiated replication mode (HAST_REPLICATION_*). */ int hr_replication; + /* Configured replication mode (HAST_REPLICATION_*). */ + int hr_original_replication; /* Provider name that will appear in /dev/hast/. */ char hr_provname[NAME_MAX]; /* Synchronization extent size. */ @@ -156,6 +159,8 @@ int hr_compression; /* Checksum algorithm. */ int hr_checksum; + /* Protocol version. */ + int hr_version; /* Path to local component. */ char hr_localpath[PATH_MAX]; Index: sbin/hastd/refcnt.h =================================================================== --- sbin/hastd/refcnt.h (wersja 0) +++ sbin/hastd/refcnt.h (kopia robocza) @@ -0,0 +1,57 @@ +/*- + * Copyright (c) 2005 John Baldwin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the author nor the names of any co-contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * $FreeBSD$ + */ + +#ifndef __REFCNT_H__ +#define __REFCNT_H__ + +#include + +#include "pjdlog.h" + +static __inline void +refcnt_acquire(volatile unsigned int *count) +{ + + atomic_add_acq_int(count, 1); +} + +static __inline unsigned int +refcnt_release(volatile unsigned int *count) +{ + unsigned int old; + + /* XXX: Should this have a rel membar? */ + old = atomic_fetchadd_int(count, -1); + PJDLOG_ASSERT(old > 0); + return (old - 1); +} + +#endif /* ! __REFCNT_H__ */ Zmiany atrybutów dla: sbin/hastd/refcnt.h ___________________________________________________________________ Added: svn:mime-type ## -0,0 +1 ## +text/plain \ No newline at end of property Added: svn:keywords ## -0,0 +1 ## +FreeBSD=%H \ No newline at end of property Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: sbin/hastd/hast.conf.5 =================================================================== --- sbin/hastd/hast.conf.5 (wersja 238109) +++ sbin/hastd/hast.conf.5 (kopia robocza) @@ -129,9 +129,13 @@ .Aq node argument can be replaced either by a full hostname as obtained by .Xr gethostname 3 , -only first part of the hostname, or by node's UUID as found in the +only first part of the hostname, by node's UUID as found in the .Va kern.hostuuid .Xr sysctl 8 +variable +or by node's hostid as found in the +.Va kern.hostid +.Xr sysctl 8 variable. .Pp The following statements are available: @@ -208,15 +212,12 @@ The risk of such a situation is very small. The .Ic memsync -replication mode is currently not implemented. +replication mode is the default. .It Ic fullsync .Pp Mark the write operation as completed when local as well as remote write completes. This is the safest and the slowest replication mode. -The -.Ic fullsync -replication mode is the default. .It Ic async .Pp The write operation is reported as complete right after the local write