CFLAGS+= -Wall -I/usr/include/kqueue -I. -fPIC -g
+CFLAGS+= -include bsd/sys/time.h
LDFLAGS+= -lkqueue -L. -lbsd -lhashtab
SRCS=$(wildcard *.c)
OBJS=$(patsubst %.c, %.o, $(SRCS))
#include <stdio.h>
+
static int
ht_init(void **ht)
{
+ struct hashtab *hts;
+
if (ht == NULL)
return 0;
- *ht = calloc(1, sizeof(struct hashtab));
- return hashtab_init(*ht, 16, NULL);
+ /* allocate two hashtabales one for data and one for timestamps */
+ hts = (struct hashtab *)calloc(2, sizeof(struct hashtab));
+ if (!hashtab_init(&hts[0], 16, NULL))
+ return 0;
+ if (!hashtab_init(&hts[1], 16, NULL))
+ return 0;
+ *ht = (void *)(hts);
+ return 1;
}
static int
-ht_put(void *ht, const char *key, const char *val)
+ht_put(void *ht, const char *key, const char *val, struct timespec *ts)
{
- return hashtab_put(ht, (void *)key, strlen(key) + 1, (void *)val, strlen(val) + 1);
+ struct hashtab *hts = (struct hashtab *)ht;
+
+ if (!hashtab_put(&hts[0], (void *)key, strlen(key) + 1, (void *)val, strlen(val) + 1))
+ return 0;
+ if (!hashtab_put(&hts[1], (void *)key, strlen(key) + 1, (void *)ts, sizeof(struct timespec)))
+ return 0;
+ return 1;
}
static int
-ht_get(void *ht, const char *key, char **val)
+ht_get(void *ht, const char *key, char **val, struct timespec **ts)
{
size_t len;
+ struct hashtab *hts = (struct hashtab *)ht;
- return hashtab_get(ht, (void *)key, strlen(key) + 1, (void **)val, &len);
+ if (!hashtab_get(&hts[0], (void *)key, strlen(key) + 1, (void **)val, &len))
+ return 0;
+ if (!hashtab_get(&hts[1], (void *)key, strlen(key) + 1, (void **)ts, &len))
+ return 0;
+ return 1;
}
-int
+static int
ht_del(void *ht, const char *key)
{
- return hashtab_del(ht, (void *)key, strlen(key) + 1);
+ struct hashtab *hts = (struct hashtab *)ht;
+
+ if (!hashtab_del(&hts[0], (void *)key, strlen(key) + 1))
+ return 0;
+ if (!hashtab_del(&hts[1], (void *)key, strlen(key) + 1))
+ return 0;
+ return 1;
}
static uint64_t
@@ -245,12 +271,14 @@ post_init(struct dht_node *node)
void
dht_event_loop(struct dht_node *node)
{
+ char *vp;
char buf[BUFSIZ + 1];
char cmd[BUFSIZ + 1], key[BUFSIZ + 1], val[BUFSIZ + 1];
int n, s;
int cl;
struct kevent kev;
struct sockaddr_storage sw_addr;
+ struct timespec ts, *tsp;
socklen_t addr_len = sizeof(struct sockaddr_storage);
ssize_t bytes;
@@ -281,17 +309,31 @@ dht_event_loop(struct dht_node *node)
EV_SET(&kev, s, EVFILT_READ, EV_DELETE, 0, 0, NULL);
if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1)
break;
- printf("disconnected.\n");
+ printf("[%s] peer disconnected.\n", node->id);
} else {
#ifdef DEBUG
- printf("read %zd bytes.\n", bytes);
+ printf("[%s] read %zd bytes.\n", node->id, bytes);
#endif
- /* TODO: implement a parser */
- sscanf(buf, "%s %s %s", cmd, key, val); /* unsafe */
- if (!strncmp(cmd, "PUT", sizeof(cmd)))
- ht_put(node->ht, key, val);
+ /* TODO: implement a proper parser */
+ sscanf(buf, "%s", cmd);
+ if (!strncmp(cmd, "PUT", sizeof(cmd))) {
+ sscanf(buf + 4, "%s %lld.%ld %s", key, &ts.tv_sec, &ts.tv_nsec, val); /* unsafe */
+ node->opts->ht_put(node->ht, key, val, &ts);
+ }
+ else if (!strncmp(cmd, "GET", sizeof(cmd))) {
+ sscanf(buf + 4, "%s", key); /* unsafe */
+ if (node->opts->ht_get(node->ht, key, &vp, &tsp)) {
+ snprintf(val, sizeof(val), "%lld.%.9ld %s\n", ts.tv_sec, ts.tv_nsec, vp);
+ write(s, val, strlen(val));
+ }
+ /* XXX: else */
+ }
+ else {
+ printf("[%s] unknown command: %s.\n", node->id, cmd);
+ }
}
} else { /* UDP */
+ /* XXX: NOT YET IMPLEMENTED */
bytes = recvfrom(s, buf, BUFSIZ, 0, (struct sockaddr *)&sw_addr, &addr_len);
if (bytes == 0)
break;
@@ -306,15 +348,15 @@ dht_event_loop(struct dht_node *node)
}
int
-dht_put_tunable(struct dht_node *node, const char *key, const char *val, int w)
+dht_put_tunable(struct dht_node *node, const char *key, const char *val, struct timespec *ts, int w)
{
char *msg;
int coord, i = 0, replica, success = 0;
uint64_t token;
- if (node == NULL || key == NULL || val == NULL)
+ if (node == NULL || key == NULL || val == NULL || ts == NULL)
return 0;
- asprintf(&msg, "PUT %s %s\n", key, val);
+ asprintf(&msg, "PUT %s %lld.%.9ld %s\n", key, ts->tv_sec, ts->tv_nsec, val);
token = murmur3_partitioner(key);
coord = find_coordinator(node->peers, node->n_peers, token);
@@ -328,7 +370,7 @@ dht_put_tunable(struct dht_node *node, const char *key, const char *val, int w)
printf("[%s] replica: %d\n", node->id, replica);
#endif
if (node->peers[replica].token == node->token) {
- if (node->opts->ht_put(node->ht, key, val))
+ if (node->opts->ht_put(node->ht, key, val, ts))
success++;
continue;
}
@@ -351,3 +393,84 @@ dht_put_tunable(struct dht_node *node, const char *key, const char *val, int w)
return 0;
}
+int
+dht_get_tunable(struct dht_node *node, const char *key, char **val, int r)
+{
+ char *msg, *tmpval, *recent_val = NULL;
+ char buf[BUFSIZ + 1], vbuf[BUFSIZ + 1];
+ int coord, i = 0, replica, success = 0;
+ struct timespec recent_ts, ts, *tsp;
+ ssize_t bytes;
+ uint64_t token;
+
+ if (node == NULL || key == NULL || val == NULL)
+ return 0;
+ timespecclear(&recent_ts);
+ asprintf(&msg, "GET %s\n", key);
+
+ token = murmur3_partitioner(key);
+ coord = find_coordinator(node->peers, node->n_peers, token);
+#ifdef DEBUG
+ printf("[%s] coord for \"%s\" 0x%llx is 0x%llx\n", node->id, key, token, node->peers[coord].token);
+#endif
+
+ while (i < node->n_replicas) {
+ replica = (coord + i++) % node->n_peers;
+#ifdef DEBUG
+ printf("[%s] replica: %d\n", node->id, replica);
+#endif
+ if (node->peers[replica].token == node->token) {
+ if (node->opts->ht_get(node->ht, key, &tmpval, &tsp)) {
+ if (timespeccmp(tsp, &recent_ts, >=)) {
+ recent_ts = *tsp;
+ if (recent_val)
+ free(recent_val);
+ asprintf(&recent_val, "%s", tmpval);
+ }
+ success++;
+ }
+ continue;
+ }
+ if (!node->peers[replica].ready)
+ continue;
+ if (!(node->flags & DHT_USEUDP)) { /* TCP */
+ bytes = write(node->peers[replica].socket, msg, strlen(msg));
+ if (bytes <= 0)
+ continue;
+#ifdef DEBUG
+ printf("[%s] message sent to peer [0x%llx].\n", node->id, node->peers[replica].token);
+#endif
+ bytes = read(node->peers[replica].socket, buf, sizeof(buf));
+#ifdef DEBUG
+ printf("[%s] reponse is %zu bytes\n", node->id, bytes);
+#endif
+ if (bytes <= 0)
+ continue;
+ buf[bytes - 1] = 0;
+ sscanf(buf, "%lld.%ld %s", &ts.tv_sec, &ts.tv_nsec, vbuf);
+#ifdef DEBUG
+ printf("[%s] GOT val: %s for key: %s.\n", node->id, vbuf, key);
+#endif
+ if (timespeccmp(&ts, &recent_ts, >=)) {
+ recent_ts = ts;
+ if (recent_val)
+ free(recent_val);
+ asprintf(&recent_val, "%s", vbuf);
+ }
+ success++;
+ } else { /* UDP */
+ /* XXX: NOT YET IMPLEMENTED */
+ if (sendto(node->peers[replica].socket, msg, strlen(msg), 0, (struct sockaddr *)&(node->peers[replica].s_addr), sizeof(struct sockaddr)) > 0)
+ success++;
+ }
+ }
+ if (success)
+ *val = recent_val;
+ free(msg);
+#ifdef DEBUG
+ printf("[%s] success: %d\n", node->id, success);
+#endif
+ if (success >= r)
+ return 1;
+ return 0;
+}
#ifndef DHT_H
#define DHT_H
+#include <time.h>
#include <sys/socket.h>
#define DHT_USEUDP (1 << 0)
@@ -52,18 +53,17 @@ struct dht_peer {
struct dht_options {
uint64_t (*partitioner)(const char *);
int (*ht_init)(void **);
- int (*ht_put)(void *, const char *, const char *);
- int (*ht_get)(void *, const char *, char **);
+ int (*ht_put)(void *, const char *, const char *, struct timespec *);
+ int (*ht_get)(void *, const char *, char **, struct timespec **);
int (*ht_del)(void *, const char *);
};
int dht_init(struct dht_node *, const char *, int, int, uint32_t, struct dht_options *);
void dht_add_peer(struct dht_node *, const char *, const char *, int);
void dht_event_loop(struct dht_node *);
-int dht_put_tunable(struct dht_node *, const char *, const char *, int);
-int dht_get_tunable(int, const char *, char **, int);
-int dht_put(int, const char *);
-int dht_get(int, const char *, char **);
+int dht_put_tunable(struct dht_node *, const char *, const char *, struct timespec *, int);
+int dht_get_tunable(struct dht_node *, const char *, char **, int);
+int dht_put(struct dht_node *, const char *, const char *, struct timespec *);
+int dht_get(struct dht_node *, const char *, char **);
#endif
-
-REGRESS_TARGETS= run-regress-murmur3 run-regress-init run-regress-put run-regress-scale run-regress-buf
-SRCS= murmur3.c init.c put.c buf.c
-CLEANFILES= murmur3 init put scale buf
+REGRESS_TARGETS= run-regress-murmur3 run-regress-init \
+ run-regress-put run-regress-get \
+ run-regress-scale run-regress-buf
+SRCS= murmur3.c init.c put.c get.c scale.c buf.c
+CLEANFILES= murmur3 init put get scale buf
-CFLAGS+= -I${.CURDIR}/..
+CFLAGS+= -I${.CURDIR}/.. -g
LDFLAGS+= -ldht -lhashtab -lpthread -Bstatic -g
run-regress-murmur3: murmur3
@@ -14,6 +16,9 @@ run-regress-init: init
run-regress-put: put
@./put || true;
+run-regress-get: get
+ @./get || true;
+
run-regress-scale: scale
@./scale || true;
/*
- * Copyright (c) 201
6 Mohamed Aslan <
[email protected]>
+ * Copyright (c) 201
7 Mohamed Aslan <
[email protected]>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
#define N_REPLICAS 2
-#define MAX_KEYS 2
+#define MAX_KEYS 5
struct v_node {
char *id;
char *ip;
@@ -38,97 +38,61 @@ struct v_node {
#define N_NODES 3
static struct v_node vnodes[] = {
- {"A", "127.0.0.1", 5000, 2,
- {"hello", "world"},
- {"world", "hello"}
- },
- {"B", "127.0.0.1", 5001, 2,
- {"1hello", "world1"},
- {"world", "hello"}
- },
- {"C", "127.0.0.1", 5002, 2,
- {"12hello", "world12"},
- {"world", "hello"}
- }
-};
-
-#define MAX_RKEYS 5
-struct result {
- int n;
- char *keys[MAX_RKEYS];
- char *vals[MAX_RKEYS];
- int visited[MAX_RKEYS];
-};
-
-static struct result results[] = {
- {
- 5,
+ {"A", "127.0.0.1", 5000, 5,
{"hello", "1hello", "12hello", "world1", "world12"},
{"world", "world", "world", "hello", "hello"},
- {0, 0, 0, 0, 0}
},
- {
- 3,
- {"1hello", "world", "world1"},
- {"world", "hello", "hello"},
- {0, 0, 0}
+ {"B", "127.0.0.1", 5001, 3,
+ {"1hello", "world", "world1", NULL, NULL},
+ {"world", "hello", "hello", NULL, NULL},
},
- {
- 4,
- {"hello", "12hello", "world", "world12"},
- {"world", "world", "hello", "hello"},
- {0, 0, 0, 0}
+ {"C", "127.0.0.1", 5002, 4,
+ {"hello", "12hello", "world", "world12", NULL},
+ {"world", "world", "hello", "hello", NULL},
}
};
+#define N_KV 6
+char *keys[N_KV] = {
+ "hello", "world", "1hello",
+ "world1", "12hello", "world12"
+ };
+char *vals[N_KV] = {
+ "world", "hello", "world",
+ "hello", "world", "hello"
+ };
+
struct args {
int nidx;
struct dht_node *node;
};
-int
-lfind(const char *key, const char *val, struct result *r)
-{
- int i;
-
- for (i = 0 ; i < r->n ; i++) {
- if (!strcmp(key, r->keys[i]) && !strcmp(val, r->vals[i]) && !r->visited[i]) {
- r->visited[i] = 1;
- return 1;
- }
- }
- return 0;
-}
-
void *
test_thread(void *arg)
{
int i, fail = 0;
- int me = ((struct args *)arg)->nidx;
struct dht_node *n = ((struct args *)arg)->node;
- struct hashtab *ht = (struct hashtab *)n->ht;
- char *key, *val;
- size_t key_size, val_size;
- size_t iter;
+ char *val;
while (!n->ready);
sleep(5);
- for (i = 0 ; i < vnodes[me].n_keys ; i++) {
- if (!dht_put_tunable(n, vnodes[me].keys[i], vnodes[me].vals[i], N_REPLICAS))
- printf("FAILED [%s] dht_put_tunable.\n", n->id);
- }
-
- sleep(5);
- HASHTAB_FOREACH(*ht, iter, key, key_size, val, val_size) {
- if (lfind(key, val, &results[me])) {
- printf("OK\n");
- } else {
+ /* read all keys/vals */
+ for (i = 0 ; i < N_KV ; i++) {
+ if (!dht_get_tunable(n, keys[i], &val, N_REPLICAS)) {
+ printf("FAILED [%s] dht_get_tunable.\n", n->id);
+ printf("for %s:%s\n", keys[i], vals[i]);
+ fail++;
+ continue;
+ }
+ if (strcmp(vals[i], val)) {
+ printf("FAILED [%s] dht_get_tunable, received bad val.\n", n->id);
fail++;
- printf("FAILED [%s] %s -> %s\n", n->id, key, val);
}
}
+ printf("OK\n");
+ sleep(5);
_exit(fail);
/* NOTREACHED */
@@ -141,12 +105,21 @@ node(int me)
int i;
struct dht_node *n;
struct args *a;
+ struct hashtab *ht;
+ struct timespec ts;
pthread_t tp, tt;
n = malloc(sizeof(struct dht_node));
if (!dht_init(n, vnodes[me].id, vnodes[me].port, N_REPLICAS, 0, NULL))
err(1, "dht_init");
+ /* populate hashtable */
+ ht = (struct hashtab *)n->ht;
+ (void)clock_gettime(CLOCK_REALTIME, &ts);
+ for (i = 0 ; i < vnodes[me].n_keys ; i++) {
+ n->opts->ht_put(ht, vnodes[me].keys[i], vnodes[me].vals[i], &ts);
+ }
+
for (i = 0 ; i < N_NODES ; i++)
if (i != me)
dht_add_peer(n, vnodes[i].id, vnodes[i].ip, vnodes[i].port);
#include <pthread.h>
#include <err.h>
#include <errno.h>
+#include <time.h>
#include <unistd.h>
#include <sys/wait.h>
@@ -108,6 +109,7 @@ test_thread(void *arg)
int me = ((struct args *)arg)->nidx;
struct dht_node *n = ((struct args *)arg)->node;
struct hashtab *ht = (struct hashtab *)n->ht;
+ struct timespec ts;
char *key, *val;
size_t key_size, val_size;
size_t iter;
@@ -115,8 +117,9 @@ test_thread(void *arg)
while (!n->ready);
sleep(5);
+ (void) clock_gettime(CLOCK_REALTIME, &ts);
for (i = 0 ; i < vnodes[me].n_keys ; i++) {
- if (!dht_put_tunable(n, vnodes[me].keys[i], vnodes[me].vals[i], N_REPLICAS))
+ if (!dht_put_tunable(n, vnodes[me].keys[i], vnodes[me].vals[i], &ts, N_REPLICAS))
printf("FAILED [%s] dht_put_tunable.\n", n->id);
}
#include <pthread.h>
#include <err.h>
#include <errno.h>
+#include <time.h>
#include <unistd.h>
#include <sys/wait.h>
@@ -56,14 +57,16 @@ test_thread(void *arg)
int me = ((struct args *)arg)->nidx;
struct dht_node *n = ((struct args *)arg)->node;
struct hashtab *ht = (struct hashtab *)n->ht;
+ struct timespec ts;
char *key, *val;
size_t val_size;
while (!n->ready);
sleep(5);
+ (void)clock_gettime(CLOCK_REALTIME, &ts);
for (i = 0 ; i < vnodes[me].n_keys ; i++) {
- if (!dht_put_tunable(n, vnodes[me].keys[i], vnodes[me].vals[i], N_REPLICAS))
+ if (!dht_put_tunable(n, vnodes[me].keys[i], vnodes[me].vals[i], &ts, N_REPLICAS))
printf("FAILED [%s] dht_put_tunable.\n", n->id);
}