[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33313 - in gnunet/src: include multicast psyc
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33313 - in gnunet/src: include multicast psyc |
Date: |
Sat, 17 May 2014 12:16:15 +0200 |
Author: tg
Date: 2014-05-17 12:16:15 +0200 (Sat, 17 May 2014)
New Revision: 33313
Modified:
gnunet/src/include/gnunet_protocols.h
gnunet/src/include/gnunet_psyc_service.h
gnunet/src/include/gnunet_signatures.h
gnunet/src/multicast/gnunet-service-multicast.c
gnunet/src/multicast/multicast.h
gnunet/src/multicast/multicast_api.c
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc.h
gnunet/src/psyc/psyc_api.c
gnunet/src/psyc/test_psyc.c
Log:
multicast, psyc: client connections, join requests
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2014-05-17 04:47:33 UTC (rev
33312)
+++ gnunet/src/include/gnunet_protocols.h 2014-05-17 10:16:15 UTC (rev
33313)
@@ -2338,73 +2338,63 @@
#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750
/**
- * C->S: Stop the origin.
- */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751
-
-/**
* C->S: Join group as a member.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 751
/**
- * C->S: Part the group.
- */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753
-
-/**
- * C<->S<->T: Multicast message from the origin to all members.
- */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754
-
-/**
- * C<->S<->T: Unicast request from a group member to the origin.
- */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755
-
-/**
* C<--S<->T: A peer wants to join the group.
*
* Unicast message to the origin or another group member.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST
+#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST 752
/**
* C<->S<->T: Response to a join request.
*
* Unicast message from a group member to the peer wanting to join.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION
+#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION 753
/**
* A peer wants to part the group.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST
+#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST 754
/**
* Acknowledgement sent in response to a part request.
*
* Unicast message from a group member to the peer wanting to part.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK
+#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK 755
/**
* Group terminated.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_GROUP_END
+#define GNUNET_MESSAGE_TYPE_MULTICAST_GROUP_END 756
/**
- *
+ * C<->S<->T: Multicast message from the origin to all members.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 757
/**
- *
+ * C<->S<->T: Unicast request from a group member to the origin.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758
+/**
+ * C<->S<->T: Replay request from a group member to another member.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 759
+/**
+ * C<->S<->T: Cancellation of a replay request.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL 760
+
+
/*******************************************************************************
* SECRETSHARING message types
******************************************************************************/
Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h 2014-05-17 04:47:33 UTC (rev
33312)
+++ gnunet/src/include/gnunet_psyc_service.h 2014-05-17 10:16:15 UTC (rev
33313)
@@ -620,9 +620,7 @@
* @param message_cb Function to invoke on message parts received from the
* channel, typically at least contains method handlers for @e join and
* @e part.
- * @param join_cb function invoked once we have joined with the current
- * message ID of the channel
- * @param slave_joined_cb Function to invoke when a peer wants to join.
+ * @param slave_joined_cb Function invoked once we have joined the channel.
* @param cls Closure for @a message_cb and @a slave_joined_cb.
* @param method_name Method name for the join request.
* @param env Environment containing transient variables for the request, or
NULL.
@@ -638,7 +636,6 @@
uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
GNUNET_PSYC_MessageCallback message_cb,
- GNUNET_PSYC_JoinCallback join_cb,
GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
void *cls,
const char *method_name,
Modified: gnunet/src/include/gnunet_signatures.h
===================================================================
--- gnunet/src/include/gnunet_signatures.h 2014-05-17 04:47:33 UTC (rev
33312)
+++ gnunet/src/include/gnunet_signatures.h 2014-05-17 10:16:15 UTC (rev
33313)
@@ -137,7 +137,7 @@
#define GNUNET_SIGNATURE_PURPOSE_REGEX_ACCEPT 18
/**
- * Signature of a multicast message.
+ * Signature of a multicast message sent by the origin.
*/
#define GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE 19
@@ -166,7 +166,12 @@
*/
#define GNUNET_SIGNATURE_PURPOSE_SECRETSHARING_DECRYPTION 23
+/**
+ * Signature of a multicast request sent by a member.
+ */
+#define GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST 24
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: gnunet/src/multicast/gnunet-service-multicast.c
===================================================================
--- gnunet/src/multicast/gnunet-service-multicast.c 2014-05-17 04:47:33 UTC
(rev 33312)
+++ gnunet/src/multicast/gnunet-service-multicast.c 2014-05-17 10:16:15 UTC
(rev 33313)
@@ -46,22 +46,40 @@
/**
* All connected origins.
- * Group's pub_key_hash -> struct Group
+ * Group's pub_key_hash -> struct Origin
*/
static struct GNUNET_CONTAINER_MultiHashMap *origins;
/**
* All connected members.
- * Group's pub_key_hash -> struct Group
+ * Group's pub_key_hash -> struct Member
*/
static struct GNUNET_CONTAINER_MultiHashMap *members;
/**
+ * Connected members per group.
+ * Group's pub_key_hash -> Member's pub_key -> struct Member
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *group_members;
+
+
+/**
+ * List of connected clients.
+ */
+struct ClientList
+{
+ struct ClientList *prev;
+ struct ClientList *next;
+ struct GNUNET_SERVER_Client *client;
+};
+
+/**
* Common part of the client context for both an origin and member.
*/
struct Group
{
- struct GNUNET_SERVER_Client *client;
+ struct ClientList *clients_head;
+ struct ClientList *clients_tail;
/**
* Public key of the group.
@@ -117,6 +135,29 @@
struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
/**
+ * Public key of the member.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+
+ /**
+ * Hash of @a pub_key.
+ */
+ struct GNUNET_HashCode pub_key_hash;
+
+ /**
+ * Join request sent to the origin / members.
+ */
+ struct GNUNET_MULTICAST_JoinRequest *join_request;
+
+ /**
+ * Join decision sent in reply to our request.
+ *
+ * Only a positive decision is stored here, in case of a negative decision
the
+ * client is disconnected.
+ */
+ struct MulticastJoinDecisionMessage *join_decision;
+
+ /**
* Last request fragment ID sent to the origin.
*/
uint64_t max_fragment_id;
@@ -135,23 +176,161 @@
/* FIXME: do clean up here */
}
+/**
+ * Clean up origin data structures after a client disconnected.
+ */
+static void
+cleanup_origin (struct Origin *orig)
+{
+ struct Group *grp = &orig->grp;
+ GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
+}
+
/**
- * Iterator callback for sending a message to clients.
+ * Clean up member data structures after a client disconnected.
*/
+static void
+cleanup_member (struct Member *mem)
+{
+ struct Group *grp = &mem->grp;
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
+ &grp->pub_key_hash);
+ GNUNET_assert (NULL != grp_mem);
+ GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
+
+ if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
+ {
+ GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
+ grp_mem);
+ GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
+ }
+ GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
+}
+
+
+/**
+ * Clean up group data structures after a client disconnected.
+ */
+static void
+cleanup_group (struct Group *grp)
+{
+ (GNUNET_YES == grp->is_origin)
+ ? cleanup_origin ((struct Origin *) grp)
+ : cleanup_member ((struct Member *) grp);
+
+ GNUNET_free (grp);
+}
+
+
+/**
+ * Called whenever a client is disconnected.
+ *
+ * Frees our resources associated with that client.
+ *
+ * @param cls Closure.
+ * @param client Client handle.
+ */
+static void
+client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+ if (NULL == client)
+ return;
+
+ struct Group *grp
+ = GNUNET_SERVER_client_get_user_context (client, struct Group);
+
+ if (NULL == grp)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p User context is NULL in client_disconnect()\n", grp);
+ GNUNET_assert (0);
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Client (%s) disconnected from group %s\n",
+ grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
+ GNUNET_h2s (&grp->pub_key_hash));
+
+ struct ClientList *cl = grp->clients_head;
+ while (NULL != cl)
+ {
+ if (cl->client == client)
+ {
+ GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
+ GNUNET_free (cl);
+ break;
+ }
+ cl = cl->next;
+ }
+
+ if (NULL == grp->clients_head)
+ { /* Last client disconnected. */
+#if FIXME
+ if (NULL != grp->tmit_head)
+ { /* Send pending messages via CADET before cleanup. */
+ transmit_message (grp);
+ }
+ else
+#endif
+ {
+ cleanup_group (grp);
+ }
+ }
+}
+
+
+/**
+ * Send message to all clients connected to the group.
+ */
+static void
+message_to_clients (const struct Group *grp,
+ const struct GNUNET_MessageHeader *msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Sending message to clients.\n", grp);
+
+ struct ClientList *cl = grp->clients_head;
+ while (NULL != cl)
+ {
+ GNUNET_SERVER_notification_context_add (nc, cl->client);
+ GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg,
GNUNET_NO);
+ cl = cl->next;
+ }
+}
+
+
+/**
+ * Iterator callback for sending a message to origin clients.
+ */
static int
-message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *group)
+origin_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+ void *origin)
{
const struct GNUNET_MessageHeader *msg = cls;
- struct Group *grp = group;
+ struct Member *orig = origin;
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Sending message to client.\n", grp);
+ message_to_clients (&orig->grp, msg);
+ return GNUNET_YES;
+}
- GNUNET_SERVER_notification_context_add (nc, grp->client);
- GNUNET_SERVER_notification_context_unicast (nc, grp->client, msg, GNUNET_NO);
+/**
+ * Iterator callback for sending a message to member clients.
+ */
+static int
+member_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+ void *member)
+{
+ const struct GNUNET_MessageHeader *msg = cls;
+ struct Member *mem = member;
+
+ if (NULL != mem->join_decision)
+ { /* Only send message to admitted members */
+ message_to_clients (&mem->grp, msg);
+ }
return GNUNET_YES;
}
@@ -167,10 +346,10 @@
{
if (origins != NULL)
GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- message_callback, (void *)
msg);
+ origin_message_cb, (void *)
msg);
if (members != NULL)
GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- message_callback, (void *)
msg);
+ member_message_cb, (void *)
msg);
}
@@ -185,7 +364,7 @@
{
if (origins != NULL)
GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- message_callback, (void *)
msg);
+ origin_message_cb, (void *)
msg);
}
@@ -199,38 +378,47 @@
const struct MulticastOriginStartMessage *
msg = (const struct MulticastOriginStartMessage *) m;
- struct Origin *orig = GNUNET_new (struct Origin);
- orig->priv_key = msg->group_key;
+ struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+ struct GNUNET_HashCode pub_key_hash;
- struct Group *grp = &orig->grp;
- grp->is_origin = GNUNET_YES;
- grp->client = client;
+ GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
+ GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
- GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
- GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
&grp->pub_key_hash);
+ struct Origin *
+ orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
+ struct Group *grp;
+ if (NULL == orig)
+ {
+ orig = GNUNET_new (struct Origin);
+ orig->priv_key = msg->group_key;
+ grp = &orig->grp;
+ grp->is_origin = GNUNET_YES;
+ grp->pub_key = pub_key;
+ grp->pub_key_hash = pub_key_hash;
+
+ GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+ else
+ {
+ grp = &orig->grp;
+ }
+
+ struct ClientList *cl = GNUNET_new (struct ClientList);
+ cl->client = client;
+ GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Client connected as origin to group %s.\n",
orig, GNUNET_h2s (&grp->pub_key_hash));
GNUNET_SERVER_client_set_user_context (client, grp);
- GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
/**
- * Handle a client stopping an origin.
- */
-static void
-handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
-{
-}
-
-
-/**
* Handle a connecting client joining a group.
*/
static void
@@ -240,34 +428,113 @@
struct MulticastMemberJoinMessage *
msg = (struct MulticastMemberJoinMessage *) m;
- struct Member *mem = GNUNET_new (struct Member);
- mem->priv_key = msg->member_key;
+ struct GNUNET_CRYPTO_EddsaPublicKey mem_pub_key;
+ struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
- struct Group *grp = &mem->grp;
- grp->is_origin = GNUNET_NO;
- grp->client = client;
- grp->pub_key = msg->group_key;
- GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
&grp->pub_key_hash);
+ GNUNET_CRYPTO_eddsa_key_get_public (&msg->member_key, &mem_pub_key);
+ GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
+ GNUNET_CRYPTO_hash (&msg->group_key, sizeof (msg->group_key), &pub_key_hash);
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
+ struct Member *mem = NULL;
+ struct Group *grp;
+
+ if (NULL == grp_mem)
+ {
+ grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ GNUNET_CONTAINER_multihashmap_put (group_members, &pub_key_hash, grp_mem,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ }
+ else
+ {
+ mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
+ }
+
+ if (NULL == mem)
+ {
+ mem = GNUNET_new (struct Member);
+ mem->priv_key = msg->member_key;
+ mem->pub_key = mem_pub_key;
+ mem->pub_key_hash = mem_pub_key_hash;
+
+ grp = &mem->grp;
+ grp->is_origin = GNUNET_NO;
+ grp->pub_key = msg->group_key;
+ grp->pub_key_hash = pub_key_hash;
+
+ GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem_pub_key_hash, mem,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ }
+ else
+ {
+ grp = &mem->grp;
+ }
+
+ struct ClientList *cl = GNUNET_new (struct ClientList);
+ cl->client = client;
+ GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Client connected as member to group %s.\n",
mem, GNUNET_h2s (&grp->pub_key_hash));
GNUNET_SERVER_client_set_user_context (client, grp);
- GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
+ if (NULL != mem->join_decision)
+ { /* Already got a join decision, send it to client. */
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client,
+ (struct GNUNET_MessageHeader *)
+ mem->join_decision,
+ GNUNET_NO);
+ }
+ else if (grp->clients_head == grp->clients_tail)
+ { /* First client, send join request. */
+ struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *)
&msg[1];
+ uint32_t relay_count = ntohs (msg->relay_count);
+ struct GNUNET_MessageHeader *
+ join_req = ((struct GNUNET_MessageHeader *)
+ ((char *) &msg[1]) + relay_count * sizeof (*relays));
+ uint16_t join_req_size = ntohs (join_req->size);
-/**
- * Handle a client parting a group.
- */
-static void
-handle_member_part (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
-{
+ struct MulticastJoinRequestMessage *
+ req = GNUNET_malloc (sizeof (*req) + join_req_size);
+ req->header.size = htons (sizeof (*req) + join_req_size);
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
+ req->group_key = grp->pub_key;
+ GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &req->member_key);
+ memcpy (&req[1], join_req, join_req_size);
+ req->purpose.size = htonl (sizeof (*req) + join_req_size
+ - sizeof (req->header)
+ - sizeof (req->signature));
+ req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
+
+ if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose,
+ &req->signature))
+ {
+ /* FIXME: handle error */
+ GNUNET_assert (0);
+ }
+
+ if (NULL != mem->join_request)
+ GNUNET_free (mem->join_request);
+ mem->join_request = req;
+
+ if (GNUNET_YES
+ == GNUNET_CONTAINER_multihashmap_contains (origins,
&grp->pub_key_hash))
+ { /* Local origin */
+ message_to_origin (grp, (struct GNUNET_MessageHeader *)
mem->join_request);
+ }
+ else
+ {
+ /* FIXME: send join request to remote origin / members */
+ }
+ }
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -296,7 +563,7 @@
&msg->signature))
{
/* FIXME: handle error */
- return;
+ GNUNET_assert (0);
}
/* FIXME: send to remote members */
@@ -327,18 +594,24 @@
- sizeof (req->header)
- sizeof (req->member_key)
- sizeof (req->signature));
- req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
+ req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose,
&req->signature))
{
/* FIXME: handle error */
- return;
+ GNUNET_assert (0);
}
- /* FIXME: send to remote origin */
-
- message_to_origin (grp, m);
+ if (GNUNET_YES
+ == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash))
+ { /* Local origin */
+ message_to_origin (grp, m);
+ }
+ else
+ {
+ /* FIXME: send to remote origin */
+ }
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -357,15 +630,9 @@
{ &handle_origin_start, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
- { &handle_origin_stop, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP, 0 },
-
{ &handle_member_join, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
- { &handle_member_part, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART, 0 },
-
{ &handle_multicast_message, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
@@ -379,9 +646,11 @@
stats = GNUNET_STATISTICS_create ("multicast", cfg);
origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
nc = GNUNET_SERVER_notification_context_create (server, 1);
GNUNET_SERVER_add_handlers (server, handlers);
+ GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task,
NULL);
}
Modified: gnunet/src/multicast/multicast.h
===================================================================
--- gnunet/src/multicast/multicast.h 2014-05-17 04:47:33 UTC (rev 33312)
+++ gnunet/src/multicast/multicast.h 2014-05-17 10:16:15 UTC (rev 33313)
@@ -33,7 +33,7 @@
/**
* Header of a join request sent to the origin or another member.
*/
-struct GNUNET_MULTICAST_JoinRequest
+struct MulticastJoinRequestMessage
{
/**
* Header for the join request.
@@ -67,7 +67,7 @@
*/
struct GNUNET_PeerIdentity member_peer;
- /* Followed by request body. */
+ /* Followed by struct GNUNET_MessageHeader join_request */
};
@@ -97,9 +97,9 @@
*/
uint32_t relay_count;
- /* followed by 'relay_count' peer identities */
+ /* Followed by relay_count peer identities */
- /* followed by the join response message */
+ /* Followed by the join response message */
};
Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c 2014-05-17 04:47:33 UTC (rev
33312)
+++ gnunet/src/multicast/multicast_api.c 2014-05-17 10:16:15 UTC (rev
33313)
@@ -196,6 +196,17 @@
*/
struct GNUNET_MULTICAST_JoinHandle
{
+ struct GNUNET_MULTICAST_Group *group;
+
+ /**
+ * Public key of the joining member.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey member_key;
+
+ /**
+ * Peer identity of the joining member.
+ */
+ struct GNUNET_PeerIdentity member_peer;
};
@@ -437,8 +448,7 @@
* Iterator callback for calling message callbacks for all groups.
*/
static int
-message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *group)
+message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group)
{
const struct GNUNET_MessageHeader *msg = cls;
struct GNUNET_MULTICAST_Group *grp = group;
@@ -456,32 +466,10 @@
/**
- * Handle a multicast message from the service.
- *
- * Call message callbacks of all origins and members of the destination group.
- *
- * @param grp Destination group of the message.
- * @param msg The message.
- */
-static void
-handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
- const struct GNUNET_MULTICAST_MessageHeader *msg)
-{
- if (origins != NULL)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- message_callback, (void *)
msg);
- if (members != NULL)
- GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- message_callback, (void *)
msg);
-}
-
-
-/**
* Iterator callback for calling request callbacks of origins.
*/
static int
-request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
- void *origin)
+request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void
*origin)
{
const struct GNUNET_MULTICAST_RequestHeader *req = cls;
struct GNUNET_MULTICAST_Origin *orig = origin;
@@ -497,20 +485,26 @@
/**
- * Handle a multicast request from the service.
- *
- * Call request callbacks of all origins of the destination group.
- *
- * @param grp Destination group of the message.
- * @param msg The message.
+ * Iterator callback for calling join request callbacks of origins.
*/
-static void
-handle_multicast_request (struct GNUNET_MULTICAST_Group *grp,
- const struct GNUNET_MULTICAST_RequestHeader *req)
+static int
+join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+ void *group)
{
- if (NULL != origins)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- request_callback, (void *)
req);
+ const struct MulticastJoinRequestMessage *req = cls;
+ struct GNUNET_MULTICAST_Group *grp = group;
+
+ struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+ jh->group = grp;
+ jh->member_key = req->member_key;
+ jh->member_peer = req->member_peer;
+
+ const struct GNUNET_MessageHeader *msg = NULL;
+ if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size))
+ msg =(const struct GNUNET_MessageHeader *) &req[1];
+
+ grp->join_cb (grp->cb_cls, &req->member_key, msg, jh);
+ return GNUNET_YES;
}
@@ -551,22 +545,31 @@
size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
break;
+ case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
+ size_min = sizeof (struct MulticastJoinRequestMessage);
+ break;
+
default:
GNUNET_break_op (0);
- return;
+ type = 0;
}
if (! ((0 < size_eq && size == size_eq)
|| (0 < size_min && size_min <= size)))
{
GNUNET_break_op (0);
- return;
+ type = 0;
}
switch (type)
{
case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
- handle_multicast_message (grp, (struct GNUNET_MULTICAST_MessageHeader *)
msg);
+ if (origins != NULL)
+ GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
+ message_cb, (void *) msg);
+ if (members != NULL)
+ GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
+ message_cb, (void *) msg);
break;
case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
@@ -576,12 +579,19 @@
break;
}
- handle_multicast_request (grp, (struct GNUNET_MULTICAST_RequestHeader *)
msg);
+ if (NULL != origins)
+ GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
+ request_cb, (void *) msg);
break;
- default:
- GNUNET_break_op (0);
- return;
+ case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
+ if (NULL != origins)
+ GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
+ join_request_cb, (void *)
msg);
+ if (NULL != members)
+ GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
+ join_request_cb, (void *)
msg);
+ break;
}
if (NULL != grp->client)
@@ -621,6 +631,7 @@
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader
*join_response)
{
+ GNUNET_free (jh);
return NULL;
}
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2014-05-17 04:47:33 UTC (rev
33312)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2014-05-17 10:16:15 UTC (rev
33313)
@@ -59,17 +59,23 @@
/**
* All connected masters.
- * Channel's pub_key_hash -> struct Channel
+ * Channel's pub_key_hash -> struct Master
*/
static struct GNUNET_CONTAINER_MultiHashMap *masters;
/**
* All connected slaves.
- * Channel's pub_key_hash -> struct Channel
+ * Channel's pub_key_hash -> struct Slave
*/
static struct GNUNET_CONTAINER_MultiHashMap *slaves;
+/**
+ * Connected slaves per channel.
+ * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
+
/**
* Message in the transmission queue.
*/
@@ -78,6 +84,8 @@
struct TransmitMessage *prev;
struct TransmitMessage *next;
+ struct GNUNET_SERVER_Client *client;
+
/**
* ID assigned to the message.
*/
@@ -164,11 +172,23 @@
/**
+ * List of connected clients.
+ */
+struct ClientList
+{
+ struct ClientList *prev;
+ struct ClientList *next;
+ struct GNUNET_SERVER_Client *client;
+};
+
+
+/**
* Common part of the client context for both a channel master and slave.
*/
struct Channel
{
- struct GNUNET_SERVER_Client *client;
+ struct ClientList *clients_head;
+ struct ClientList *clients_tail;
struct TransmitMessage *tmit_head;
struct TransmitMessage *tmit_tail;
@@ -316,6 +336,16 @@
struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
/**
+ * Public key of the slave.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+
+ /**
+ * Hash of @a pub_key.
+ */
+ struct GNUNET_HashCode pub_key_hash;
+
+ /**
* Handle for the multicast member.
*/
struct GNUNET_MULTICAST_Member *member;
@@ -378,30 +408,62 @@
}
+/**
+ * Clean up master data structures after a client disconnected.
+ */
static void
-client_cleanup (struct Channel *ch)
+cleanup_master (struct Master *mst)
{
- /* FIXME: fragment_cache_clear */
+ struct Channel *ch = &mst->channel;
- if (ch->is_master)
+ if (NULL != mst->origin)
+ GNUNET_MULTICAST_origin_stop (mst->origin);
+ GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
+}
+
+
+/**
+ * Clean up slave data structures after a client disconnected.
+ */
+static void
+cleanup_slave (struct Slave *slv)
+{
+ struct Channel *ch = &slv->channel;
+ struct GNUNET_CONTAINER_MultiHashMap *
+ ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
+ &ch->pub_key_hash);
+ GNUNET_assert (NULL != ch_slv);
+ GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
+
+ if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
{
- struct Master *mst = (struct Master *) ch;
- if (NULL != mst->origin)
- GNUNET_MULTICAST_origin_stop (mst->origin);
- GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
+ GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
+ ch_slv);
+ GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
}
- else
- {
- struct Slave *slv = (struct Slave *) ch;
- if (NULL != slv->join_req)
- GNUNET_free (slv->join_req);
- if (NULL != slv->relays)
- GNUNET_free (slv->relays);
- if (NULL != slv->member)
- GNUNET_MULTICAST_member_part (slv->member);
- GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
- }
+ GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
+ if (NULL != slv->join_req)
+ GNUNET_free (slv->join_req);
+ if (NULL != slv->relays)
+ GNUNET_free (slv->relays);
+ if (NULL != slv->member)
+ GNUNET_MULTICAST_member_part (slv->member);
+ GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
+}
+
+
+/**
+ * Clean up channel data structures after a client disconnected.
+ */
+static void
+cleanup_channel (struct Channel *ch)
+{
+ /* FIXME: fragment_cache_clear */
+
+ (GNUNET_YES == ch->is_master)
+ ? cleanup_master ((struct Master *) ch)
+ : cleanup_slave ((struct Slave *) ch);
GNUNET_free (ch);
}
@@ -421,7 +483,10 @@
struct Channel *ch
= GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Client (%s) disconnected from channel %s\n",
+ ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
+ GNUNET_h2s (&ch->pub_key_hash));
if (NULL == ch)
{
@@ -431,29 +496,112 @@
return;
}
- ch->disconnected = GNUNET_YES;
+ struct ClientList *cl = ch->clients_head;
+ while (NULL != cl)
+ {
+ if (cl->client == client)
+ {
+ GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
+ GNUNET_free (cl);
+ break;
+ }
+ cl = cl->next;
+ }
- /* Send pending messages to multicast before cleanup. */
- if (NULL != ch->tmit_head)
+ if (NULL == ch->clients_head)
+ { /* Last client disconnected. */
+ if (NULL != ch->tmit_head)
+ { /* Send pending messages to multicast before cleanup. */
+ transmit_message (ch);
+ }
+ else
+ {
+ cleanup_channel (ch);
+ }
+ }
+}
+
+
+/**
+ * Send message to all clients connected to the channel.
+ */
+static void
+msg_to_clients (const struct Channel *ch,
+ const struct GNUNET_MessageHeader *msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Sending message to clients.\n", ch);
+
+ struct ClientList *cl = ch->clients_head;
+ while (NULL != cl)
{
- transmit_message (ch);
+ GNUNET_SERVER_notification_context_add (nc, cl->client);
+ GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg,
GNUNET_NO);
+ cl = cl->next;
}
+}
+
+
+/**
+ * Closure for join_mem_test_cb()
+ */
+struct JoinMemTestCls
+{
+ struct Channel *ch;
+ struct GNUNET_MULTICAST_JoinHandle *jh;
+ struct MasterJoinRequest *master_join_req;
+};
+
+
+/**
+ * Membership test result callback used for join requests.m
+ */
+static void
+join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
+{
+ struct JoinMemTestCls *jcls = cls;
+
+ if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
+ { /* Pass on join request to client if this is a master channel */
+ msg_to_clients (jcls->ch,
+ (struct GNUNET_MessageHeader *) jcls->master_join_req);
+ }
else
{
- client_cleanup (ch);
+ // FIXME: relays
+ GNUNET_MULTICAST_join_decision(jcls->jh, result, 0, NULL, NULL);
}
+ GNUNET_free (jcls->master_join_req);
+ GNUNET_free (jcls);
}
/**
- * Master receives a join request from a slave.
+ * Incoming join request from multicast.
*/
static void
join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
const struct GNUNET_MessageHeader *join_req,
struct GNUNET_MULTICAST_JoinHandle *jh)
{
+ struct Channel *ch = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
+ uint16_t join_req_size = (NULL != join_req) ? ntohs (join_req->size) : 0;
+ struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) +
join_req_size);
+ req->header.size = htons (sizeof (*req) + join_req_size);
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
+ req->slave_key = *slave_key;
+ memcpy (&req[1], join_req, join_req_size);
+
+ struct JoinMemTestCls *jcls = GNUNET_malloc (sizeof (*jcls));
+ jcls->ch = ch;
+ jcls->jh = jh;
+ jcls->master_join_req = req;
+
+ GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
+ ch->max_message_id, 0,
+ &join_mem_test_cb, jcls);
}
@@ -474,6 +622,7 @@
struct GNUNET_MULTICAST_ReplayHandle *rh)
{
+
}
@@ -497,35 +646,6 @@
}
-static void
-message_to_client (struct Channel *ch,
- const struct GNUNET_MULTICAST_MessageHeader *mmsg)
-{
- uint16_t size = ntohs (mmsg->header.size);
- struct GNUNET_PSYC_MessageHeader *pmsg;
- uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Sending message to client. "
- "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
- ch, GNUNET_ntohll (mmsg->fragment_id),
- GNUNET_ntohll (mmsg->message_id));
-
- pmsg = GNUNET_malloc (psize);
- pmsg->header.size = htons (psize);
- pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- pmsg->message_id = mmsg->message_id;
-
- memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
-
- GNUNET_SERVER_notification_context_add (nc, ch->client);
- GNUNET_SERVER_notification_context_unicast (nc, ch->client,
- (const struct
GNUNET_MessageHeader *) pmsg,
- GNUNET_NO);
- GNUNET_free (pmsg);
-}
-
-
/**
* Convert an uint64_t in network byte order to a HashCode
* that can be used as key in a MultiHashMap
@@ -564,6 +684,34 @@
/**
+ * Send multicast message to all clients connected to the channel.
+ */
+static void
+mmsg_to_clients (struct Channel *ch,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+ uint16_t size = ntohs (mmsg->header.size);
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending message to client. "
+ "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
+ ch, GNUNET_ntohll (mmsg->fragment_id),
+ GNUNET_ntohll (mmsg->message_id));
+
+ pmsg = GNUNET_malloc (psize);
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = mmsg->message_id;
+
+ memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+ msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
+ GNUNET_free (pmsg);
+}
+
+
+/**
* Insert a multicast message fragment into the queue belonging to the message.
*
* @param ch Channel.
@@ -752,7 +900,7 @@
{
if (GNUNET_NO == drop)
{
- message_to_client (ch, cache_entry->mmsg);
+ mmsg_to_clients (ch, cache_entry->mmsg);
}
if (cache_entry->ref_count <= 1)
{
@@ -997,11 +1145,7 @@
pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
memcpy (&pmsg[1], &req[1], size - sizeof (*req));
-
- GNUNET_SERVER_notification_context_add (nc, ch->client);
- GNUNET_SERVER_notification_context_unicast (nc, ch->client,
- (const struct
GNUNET_MessageHeader *) pmsg,
- GNUNET_NO);
+ msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
GNUNET_free (pmsg);
break;
}
@@ -1025,11 +1169,11 @@
struct Master *mst = cls;
struct Channel *ch = &mst->channel;
- struct CountersResult *res = GNUNET_malloc (sizeof (*res));
- res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
- res->header.size = htons (sizeof (*res));
- res->result_code = htonl (result);
- res->max_message_id = GNUNET_htonll (max_message_id);
+ struct CountersResult res;
+ res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
+ res.header.size = htons (sizeof (res));
+ res.result_code = htonl (result);
+ res.max_message_id = GNUNET_htonll (max_message_id);
if (GNUNET_OK == result || GNUNET_NO == result)
{
@@ -1053,10 +1197,7 @@
ch, result, GNUNET_h2s (&ch->pub_key_hash));
}
- GNUNET_SERVER_notification_context_add (nc, ch->client);
- GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
- GNUNET_NO);
- GNUNET_free (res);
+ msg_to_clients (ch, &res.header);
}
@@ -1071,11 +1212,11 @@
struct Slave *slv = cls;
struct Channel *ch = &slv->channel;
- struct CountersResult *res = GNUNET_malloc (sizeof (*res));
- res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
- res->header.size = htons (sizeof (*res));
- res->result_code = htonl (result);
- res->max_message_id = GNUNET_htonll (max_message_id);
+ struct CountersResult res;
+ res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+ res.header.size = htons (sizeof (res));
+ res.result_code = htonl (result);
+ res.max_message_id = GNUNET_htonll (max_message_id);
if (GNUNET_OK == result || GNUNET_NO == result)
{
@@ -1099,10 +1240,7 @@
ch, result, GNUNET_h2s (&ch->pub_key_hash));
}
- GNUNET_SERVER_notification_context_add (nc, ch->client);
- GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
- GNUNET_NO);
- GNUNET_free (res);
+ msg_to_clients (ch, &res.header);
}
@@ -1125,25 +1263,55 @@
const struct MasterStartRequest *req
= (const struct MasterStartRequest *) msg;
- struct Master *mst = GNUNET_new (struct Master);
- mst->policy = ntohl (req->policy);
- mst->priv_key = req->channel_key;
+ struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+ struct GNUNET_HashCode pub_key_hash;
- struct Channel *ch = &mst->channel;
- ch->client = client;
- ch->is_master = GNUNET_YES;
- GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
- GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
- channel_init (ch);
+ GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
+ GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
+ struct Master *
+ mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
+ struct Channel *ch;
+
+ if (NULL == mst)
+ {
+ mst = GNUNET_new (struct Master);
+ mst->policy = ntohl (req->policy);
+ mst->priv_key = req->channel_key;
+
+ ch = &mst->channel;
+ ch->is_master = GNUNET_YES;
+ ch->pub_key = pub_key;
+ ch->pub_key_hash = pub_key_hash;
+ channel_init (ch);
+
+ GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb,
mst);
+ }
+ else
+ {
+ ch = &mst->channel;
+
+ struct CountersResult res;
+ res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
+ res.header.size = htons (sizeof (res));
+ res.result_code = htonl (GNUNET_OK);
+ res.max_message_id = GNUNET_htonll (mst->max_message_id);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
+ GNUNET_NO);
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Master connected to channel %s.\n",
+ "%p Client connected as master to channel %s.\n",
mst, GNUNET_h2s (&ch->pub_key_hash));
- GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
+ struct ClientList *cl = GNUNET_new (struct ClientList);
+ cl->client = client;
+ GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
- GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_SERVER_client_set_user_context (client, ch);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -1158,37 +1326,82 @@
{
const struct SlaveJoinRequest *req
= (const struct SlaveJoinRequest *) msg;
- struct Slave *slv = GNUNET_new (struct Slave);
- slv->priv_key = req->slave_key;
- slv->origin = req->origin;
- slv->relay_count = ntohl (req->relay_count);
- if (0 < slv->relay_count)
+
+ struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
+ struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
+
+ GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
+ GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
+ GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key),
&pub_key_hash);
+
+ struct GNUNET_CONTAINER_MultiHashMap *
+ ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
+ struct Slave *slv = NULL;
+ struct Channel *ch;
+
+ if (NULL == ch_slv)
{
- const struct GNUNET_PeerIdentity *relays
- = (const struct GNUNET_PeerIdentity *) &req[1];
- slv->relays
- = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
- uint32_t i;
- for (i = 0; i < slv->relay_count; i++)
- memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+ ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ GNUNET_CONTAINER_multihashmap_put (channel_slaves, &pub_key_hash, ch_slv,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
}
+ else
+ {
+ slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
+ }
- struct Channel *ch = &slv->channel;
- ch->client = client;
- ch->is_master = GNUNET_NO;
- ch->pub_key = req->channel_key;
- GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
- &ch->pub_key_hash);
- channel_init (ch);
+ if (NULL == slv)
+ {
+ slv = GNUNET_new (struct Slave);
+ slv->priv_key = req->slave_key;
+ slv->origin = req->origin;
+ slv->relay_count = ntohl (req->relay_count);
+ if (0 < slv->relay_count)
+ {
+ const struct GNUNET_PeerIdentity *relays
+ = (const struct GNUNET_PeerIdentity *) &req[1];
+ slv->relays
+ = GNUNET_malloc (slv->relay_count * sizeof (struct
GNUNET_PeerIdentity));
+ uint32_t i;
+ for (i = 0; i < slv->relay_count; i++)
+ memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+ }
+ ch = &slv->channel;
+ ch->is_master = GNUNET_NO;
+ ch->pub_key = req->channel_key;
+ ch->pub_key_hash = pub_key_hash;
+ channel_init (ch);
+
+ GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv_pub_key_hash, ch,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb,
slv);
+ }
+ else
+ {
+ ch = &slv->channel;
+
+ struct CountersResult res;
+ res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+ res.header.size = htons (sizeof (res));
+ res.result_code = htonl (GNUNET_OK);
+ res.max_message_id = GNUNET_htonll (ch->max_message_id);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
+ GNUNET_NO);
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Slave connected to channel %s.\n",
+ "%p Client connected as slave to channel %s.\n",
slv, GNUNET_h2s (&ch->pub_key_hash));
- GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
+ struct ClientList *cl = GNUNET_new (struct ClientList);
+ cl->client = client;
+ GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
- GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_SERVER_client_set_user_context (client, &slv->channel);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -1202,14 +1415,15 @@
* @param ch The channel struct for the client.
*/
static void
-send_message_ack (struct Channel *ch)
+send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
{
struct GNUNET_MessageHeader res;
res.size = htons (sizeof (res));
res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
- GNUNET_SERVER_notification_context_add (nc, ch->client);
- GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, GNUNET_NO);
+ /* FIXME */
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
}
@@ -1236,12 +1450,13 @@
*data_size = tmit_msg->size;
memcpy (data, &tmit_msg[1], *data_size);
+ int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
+ if (NULL != tmit_msg->client)
+ send_message_ack (ch, tmit_msg->client);
+
GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
GNUNET_free (tmit_msg);
- int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
- send_message_ack (ch);
-
if (0 == ch->tmit_task)
{
if (NULL != ch->tmit_head)
@@ -1251,7 +1466,7 @@
else if (ch->disconnected)
{
/* FIXME: handle partial message (when still in_transmit) */
- client_cleanup (ch);
+ cleanup_channel (ch);
}
}
@@ -1394,12 +1609,15 @@
static void
-queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg,
+queue_message (struct Channel *ch,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg,
uint16_t first_ptype, uint16_t last_ptype)
{
uint16_t size = ntohs (msg->size) - sizeof (*msg);
struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
memcpy (&tmit_msg[1], &msg[1], size);
+ tmit_msg->client = client;
tmit_msg->size = size;
tmit_msg->state = ch->tmit_state;
@@ -1414,7 +1632,7 @@
static void
-transmit_error (struct Channel *ch)
+transmit_error (struct Channel *ch, struct GNUNET_SERVER_Client *client)
{
uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
@@ -1422,7 +1640,7 @@
msg.size = ntohs (sizeof (msg));
msg.type = ntohs (type);
- queue_message (ch, &msg, type, type);
+ queue_message (ch, client, &msg, type, type);
transmit_message (ch);
/* FIXME: cleanup */
@@ -1458,7 +1676,7 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
GNUNET_break (0);
- transmit_error (ch);
+ transmit_error (ch, client);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
@@ -1472,12 +1690,12 @@
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"%p Received invalid message part from client.\n", ch);
GNUNET_break (0);
- transmit_error (ch);
+ transmit_error (ch, client);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- queue_message (ch, msg, first_ptype, last_ptype);
+ queue_message (ch, client, msg, first_ptype, last_ptype);
transmit_message (ch);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1581,6 +1799,7 @@
stats = GNUNET_STATISTICS_create ("psyc", cfg);
masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
nc = GNUNET_SERVER_notification_context_create (server, 1);
GNUNET_SERVER_add_handlers (server, handlers);
Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h 2014-05-17 04:47:33 UTC (rev 33312)
+++ gnunet/src/psyc/psyc.h 2014-05-17 10:16:15 UTC (rev 33313)
@@ -227,6 +227,21 @@
};
+struct MasterJoinRequest
+{
+ /**
+ * Types:
+ * - GNUNET_MESSAGE_TYPE_PSYC_MASTER_JOIN_REQUEST
+ */
+ struct GNUNET_MessageHeader header;
+ /**
+ * Public key of the joining slave.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
+
+ /* Followed by struct GNUNET_MessageHeader join_request */
+};
+
GNUNET_NETWORK_STRUCT_END
#endif
Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c 2014-05-17 04:47:33 UTC (rev 33312)
+++ gnunet/src/psyc/psyc_api.c 2014-05-17 10:16:15 UTC (rev 33313)
@@ -126,13 +126,8 @@
GNUNET_PSYC_MessageCallback hist_message_cb;
/**
- * Join handler callback.
+ * Closure for @a message_cb.
*/
- GNUNET_PSYC_JoinCallback join_cb;
-
- /**
- * Closure for @a message_cb and @a join_cb.
- */
void *cb_cls;
/**
@@ -200,6 +195,11 @@
struct GNUNET_PSYC_Channel ch;
GNUNET_PSYC_MasterStartCallback start_cb;
+
+ /**
+ * Join handler callback.
+ */
+ GNUNET_PSYC_JoinCallback join_cb;
};
@@ -908,6 +908,18 @@
}
+static void
+handle_psyc_join_request (struct GNUNET_PSYC_Master *mst,
+ const struct MasterJoinRequest *req)
+{
+ // FIXME: extract join message from req[1]
+ const char *method_name = "_fixme";
+ struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+ mst->join_cb (mst->ch.cb_cls, &req->slave_key, method_name,
+ 0, NULL, NULL, 0, jh);
+}
+
+
/**
* Type of a function to call when we receive a message
* from the service.
@@ -951,6 +963,9 @@
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
size_eq = sizeof (struct GNUNET_MessageHeader);
break;
+ case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
+ size_min = sizeof (struct MasterJoinRequest);
+ break;
default:
GNUNET_break_op (0);
return;
@@ -988,6 +1003,11 @@
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
+ handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
+ (const struct MasterJoinRequest *) msg);
+ break;
}
if (NULL != ch->client)
@@ -1186,8 +1206,8 @@
req->policy = policy;
mst->start_cb = master_started_cb;
+ mst->join_cb = join_cb;
ch->message_cb = message_cb;
- ch->join_cb = join_cb;
ch->cb_cls = cls;
ch->cfg = cfg;
ch->is_master = GNUNET_YES;
@@ -1320,9 +1340,7 @@
* @param message_cb Function to invoke on message parts received from the
* channel, typically at least contains method handlers for @e join and
* @e part.
- * @param join_cb function invoked once we have joined with the current
- * message ID of the channel
- * @param slave_joined_cb Function to invoke when a peer wants to join.
+ * @param slave_joined_cb Function invoked once we have joined the channel.
* @param cls Closure for @a message_cb and @a slave_joined_cb.
* @param method_name Method name for the join request.
* @param env Environment containing transient variables for the request, or
NULL.
@@ -1339,7 +1357,6 @@
uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
GNUNET_PSYC_MessageCallback message_cb,
- GNUNET_PSYC_JoinCallback join_cb,
GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
void *cls,
const char *method_name,
@@ -1362,7 +1379,6 @@
slv->join_cb = slave_joined_cb;
ch->message_cb = message_cb;
- ch->join_cb = join_cb;
ch->cb_cls = cls;
ch->cfg = cfg;
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-05-17 04:47:33 UTC (rev 33312)
+++ gnunet/src/psyc/test_psyc.c 2014-05-17 10:16:15 UTC (rev 33313)
@@ -130,6 +130,7 @@
{
res = 1;
cleanup ();
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test FAILED.\n");
}
@@ -144,6 +145,7 @@
{
res = 0;
cleanup ();
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test PASSED.\n");
}
@@ -181,7 +183,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Master got message part of type %u and size %u "
- "belonging to message ID %llu with flags %u\n",
+ "belonging to message ID %llu with flags %bu\n",
type, size, message_id, flags);
switch (test)
@@ -225,7 +227,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Slave got message part of type %u and size %u "
- "belonging to message ID %llu with flags %u\n",
+ "belonging to message ID %llu with flags %bu\n",
type, size, message_id, flags);
switch (test)
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33313 - in gnunet/src: include multicast psyc,
gnunet <=