[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r14591 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r14591 - gnunet/src/fs |
Date: |
Fri, 4 Mar 2011 13:39:07 +0100 |
Author: grothoff
Date: 2011-03-04 13:39:07 +0100 (Fri, 04 Mar 2011)
New Revision: 14591
Modified:
gnunet/src/fs/gnunet-service-fs.h
gnunet/src/fs/gnunet-service-fs_pr.c
gnunet/src/fs/gnunet-service-fs_pr.h
Log:
fixes
Modified: gnunet/src/fs/gnunet-service-fs.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs.h 2011-03-04 12:09:54 UTC (rev 14590)
+++ gnunet/src/fs/gnunet-service-fs.h 2011-03-04 12:39:07 UTC (rev 14591)
@@ -99,6 +99,13 @@
extern unsigned int GSF_cover_query_count;
+/**
+ * Our block context.
+ */
+extern struct GNUNET_BLOCK_Context *GSF_block_ctx;
+
+
+
#endif
/* end of gnunet-service-fs.h */
Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c 2011-03-04 12:09:54 UTC (rev
14590)
+++ gnunet/src/fs/gnunet-service-fs_pr.c 2011-03-04 12:39:07 UTC (rev
14591)
@@ -24,6 +24,8 @@
* @author Christian Grothoff
*/
#include "platform.h"
+#include "gnunet_load_lib.h"
+#include "gnunet-service-fs_cp.h"
#include "gnunet-service-fs_pr.h"
@@ -111,6 +113,14 @@
/**
+ * Maximum number of requests (from other peers, overall) that we're
+ * willing to have pending at any given point in time. Can be changed
+ * via the configuration file (32k is just the default).
+ */
+static unsigned long long max_pending_requests = (32 * 1024);
+
+
+/**
* How many bytes should a bloomfilter be if we have already seen
* entry_count responses? Note that BLOOMFILTER_K gives us the number
* of bits set per entry. Furthermore, we should not re-size the
@@ -157,8 +167,8 @@
size_t nsize;
GNUNET_HashCode mhash;
- nsize = compute_bloomfilter_size (pr->replies_seen_off);
- if ( (bf != NULL) &&
+ nsize = compute_bloomfilter_size (pr->replies_seen_count);
+ if ( (pr->bf != NULL) &&
(nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
return GNUNET_NO; /* size not changed */
if (pr->bf != NULL)
@@ -221,7 +231,7 @@
pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
pr->public_data.query = *query;
- if (GNUNET_BLOCK_TYPE_SBLOCK == type)
+ if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
{
GNUNET_assert (NULL != namespace);
pr->public_data.namespace = *namespace;
@@ -229,9 +239,9 @@
if (NULL != target)
{
pr->public_data.target = *target;
- pr->has_target = GNUNET_YES;
+ pr->public_data.has_target = GNUNET_YES;
}
- pr->public_data.anonymity_level = anonymity_data;
+ pr->public_data.anonymity_level = anonymity_level;
pr->public_data.priority = priority;
pr->public_data.original_priority = priority;
pr->public_data.options = options;
@@ -240,19 +250,19 @@
pr->rh = rh;
pr->rh_cls = rh_cls;
if (ttl >= 0)
- pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS,
-
(uint32_t) ttl));
+ pr->public_data.ttl = GNUNET_TIME_relative_to_absolute
(GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+
(uint32_t) ttl));
else
- pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
- GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS,
-
(uint32_t) (- ttl)));
+ pr->public_data.ttl = GNUNET_TIME_absolute_subtract
(pr->public_data.start_time,
+
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+
(uint32_t) (- ttl)));
if (replies_seen_count > 0)
{
pr->replies_seen_size = replies_seen_count;
pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) *
pr->replies_seen_size);
memcpy (pr->replies_seen,
replies_seen,
- replies_seen_count * sizeof (struct GNUNET_HashCode));
+ replies_seen_count * sizeof (GNUNET_HashCode));
pr->replies_seen_count = replies_seen_count;
}
if (NULL != bf_data)
@@ -275,7 +285,7 @@
{
pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
pr,
- pr->ttl.abs_value);
+ pr->public_data.ttl.abs_value);
/* make sure we don't track too many requests */
while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
max_pending_requests)
{
@@ -326,7 +336,7 @@
if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
return; /* integer overflow */
- if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
+ if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
{
/* we're responsible for the BF, full refresh */
if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
@@ -336,7 +346,7 @@
memcpy (&pr->replies_seen[pr->replies_seen_count],
replies_seen,
sizeof (GNUNET_HashCode) * replies_seen_count);
- pr->replies_seen_count += replies_seen;
+ pr->replies_seen_count += replies_seen_count;
if (GNUNET_NO == refresh_bloomfilter (pr))
{
/* bf not recalculated, simply extend it with new bits */
@@ -357,8 +367,8 @@
any bloom-filter, so we need to create one on-the-fly */
pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
UINT32_MAX);
- pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size
(replies_seen_count),
- pr->mingle,
+ pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+ compute_bloomfilter_size
(replies_seen_count),
BLOOMFILTER_K);
}
for (i=0;i<pr->replies_seen_count;i++)
@@ -388,16 +398,16 @@
size_t buf_size,
void *buf)
{
- struct PendingMessage *pm;
char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
struct GetMessage *gm;
GNUNET_HashCode *ext;
size_t msize;
unsigned int k;
- int no_route;
uint32_t bm;
uint32_t prio;
size_t bf_size;
+ struct GNUNET_TIME_Absolute now;
+ int64_t ttl;
k = 0;
bm = 0;
@@ -406,12 +416,12 @@
bm |= GET_MESSAGE_BIT_RETURN_TO;
k++;
}
- if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
+ if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
{
bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
k++;
}
- if (GNUNET_YES == pr->has_target)
+ if (GNUNET_YES == pr->public_data.has_target)
{
bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
k++;
@@ -424,7 +434,7 @@
gm = (struct GetMessage*) lbuf;
gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
gm->header.size = htons (msize);
- gm->type = htonl (pr->type);
+ gm->type = htonl (pr->public_data.type);
if (GNUNET_YES == do_route)
prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
pr->public_data.priority + 1);
@@ -432,18 +442,24 @@
prio = 0;
pr->public_data.priority -= prio;
gm->priority = htonl (prio);
- gm->ttl = htonl (pr->ttl);
+ now = GNUNET_TIME_absolute_get ();
+ ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
+ gm->ttl = htonl (ttl / 1000);
gm->filter_mutator = htonl(pr->mingle);
gm->hash_bitmap = htonl (bm);
- gm->query = pr->query;
+ gm->query = pr->public_data.query;
ext = (GNUNET_HashCode*) &gm[1];
- k = 0;
+ k = 0;
if (GNUNET_YES != do_route)
- GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
- if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
- memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
- if (GNUNET_YES == pr->has_target)
- GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*)
&ext[k++]);
+ GNUNET_PEER_resolve (pr->cp->pid,
+ (struct GNUNET_PeerIdentity*) &ext[k++]);
+ if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
+ memcpy (&ext[k++],
+ &pr->public_data.namespace,
+ sizeof (GNUNET_HashCode));
+ if (GNUNET_YES == pr->public_data.has_target)
+ GNUNET_PEER_resolve (pr->public_data.target_pid,
+ (struct GNUNET_PeerIdentity*) &ext[k++]);
if (pr->bf != NULL)
GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
(char*) &ext[k],
@@ -582,13 +598,10 @@
update_request_performance_data (struct ProcessReplyClosure *prq,
struct GSF_PendingRequest *pr)
{
- unsigned int i;
- struct GNUNET_TIME_Relative cur_delay;
-
if (prq->sender == NULL)
return;
GSF_peer_update_performance_ (prq->sender,
- pr->start_time,
+ pr->public_data.start_time,
prq->priority);
}
@@ -608,12 +621,6 @@
{
struct ProcessReplyClosure *prq = cls;
struct GSF_PendingRequest *pr = value;
- struct PendingMessage *reply;
- struct ClientResponseMessage *creply;
- struct ClientList *cl;
- struct PutMessage *pm;
- struct ConnectedPeer *cp;
- size_t msize;
GNUNET_HashCode chash;
#if DEBUG_FS
@@ -622,16 +629,17 @@
(unsigned int) prq->type,
GNUNET_h2s (key));
#endif
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received and matched"),
1,
GNUNET_NO);
- prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
+ prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
prq->type,
key,
&pr->bf,
pr->mingle,
- pr->namespace, (pr->namespace != NULL) ?
sizeof (GNUNET_HashCode) : 0,
+ &pr->public_data.namespace,
+ (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK)
? sizeof (GNUNET_HashCode) : 0,
prq->data,
prq->size);
switch (prq->eval)
@@ -642,15 +650,19 @@
case GNUNET_BLOCK_EVALUATION_OK_LAST:
/* short cut: stop processing early, no BF-update, etc. */
update_request_performance_data (prq, pr);
- GNUNET_LOAD_update (rt_entry_lifetime,
- GNUNET_TIME_absolute_get_duration
(pr->start_time).rel_value);
+ GNUNET_LOAD_update (GSF_rt_entry_lifetime,
+ GNUNET_TIME_absolute_get_duration
(pr->public_data.start_time).rel_value);
/* pass on to other peers / local clients */
- pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_NO);
+ pr->rh (pr->rh_cls,
+ pr,
+ prq->expiration,
+ prq->data, prq->size,
+ GNUNET_NO);
/* destroy request, we're done */
GSF_pending_request_cancel_ (pr);
return GNUNET_YES;
case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# duplicate replies discarded
(bloomfilter)"),
1,
GNUNET_NO);
@@ -686,18 +698,22 @@
"Found result for query `%s' in local datastore\n",
GNUNET_h2s (key));
#endif
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# results found locally"),
1,
GNUNET_NO);
}
prq->priority += pr->public_data.original_priority;
- pr->public_data.remaining_priority = 0;
+ pr->public_data.priority = 0;
pr->public_data.original_priority = 0;
pr->public_data.results_found++;
prq->request_found = GNUNET_YES;
/* finally, pass on to other peer / local client */
- pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES);
+ pr->rh (pr->rh_cls,
+ pr,
+ prq->expiration,
+ prq->data, prq->size,
+ GNUNET_YES);
return GNUNET_YES;
}
@@ -725,7 +741,7 @@
delay.rel_value);
if (GNUNET_OK == success)
return;
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# datastore 'put' failures"),
1,
GNUNET_NO);
@@ -750,7 +766,7 @@
ld = GNUNET_LOAD_get_load (datastore_put_load);
if (ld < 2.0 * (1 + priority))
return GNUNET_NO;
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# storage requests dropped due to
high load"),
1,
GNUNET_NO);
@@ -776,7 +792,7 @@
void
GSF_handle_dht_reply_ (void *cls,
struct GNUNET_TIME_Absolute exp,
- const GNUNET_HashCode * key,
+ const GNUNET_HashCode *key,
const struct GNUNET_PeerIdentity * const *get_path,
const struct GNUNET_PeerIdentity * const *put_path,
enum GNUNET_BLOCK_Type type,
@@ -785,6 +801,7 @@
{
struct GSF_PendingRequest *pr = cls;
struct ProcessReplyClosure prq;
+ struct GNUNET_TIME_Absolute *start;
memset (&prq, 0, sizeof (prq));
prq.data = data;
@@ -803,10 +820,10 @@
#endif
start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
*start = GNUNET_TIME_absolute_get ();
- GNUNET_DATASTORE_put (dsh,
- 0, &query, dsize, &put[1],
+ GNUNET_DATASTORE_put (GSF_dsh,
+ 0, key, size, data,
type, prq.priority, 1 /* anonymity */,
- expiration,
+ exp,
1 + prq.priority, MAX_DATASTORE_QUEUE,
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
&put_migration_continuation,
@@ -856,7 +873,7 @@
if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
return GNUNET_SYSERR;
if (GNUNET_OK !=
- GNUNET_BLOCK_get_key (block_ctx,
+ GNUNET_BLOCK_get_key (GSF_block_ctx,
type,
&put[1],
dsize,
@@ -878,14 +895,14 @@
prq.anonymity_level = 1;
prq.finished = GNUNET_NO;
prq.request_found = GNUNET_NO;
- GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+ GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
&query,
&process_reply,
&prq);
if (NULL != cp)
{
- GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000
* prq.priority);
- GSF_get_peer_performance_data (cp)->trust += prq.priority;
+ GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE +
1000 * prq.priority);
+ GSF_get_peer_performance_data_ (cp)->trust += prq.priority;
}
if ( (GNUNET_YES == active_to_migration) &&
(GNUNET_NO == test_put_load_too_high (prq.priority)) )
@@ -898,7 +915,7 @@
#endif
start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
*start = GNUNET_TIME_absolute_get ();
- GNUNET_DATASTORE_put (dsh,
+ GNUNET_DATASTORE_put (GSF_dsh,
0, &query, dsize, &put[1],
type, prq.priority, 1 /* anonymity */,
expiration,
@@ -918,7 +935,7 @@
block_time = GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_MILLISECONDS,
5000 +
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
(unsigned int) (60000 * putl * putl)));
- GSF_block_peer_migration (cp, block_time);
+ GSF_block_peer_migration_ (cp, block_time);
}
return GNUNET_OK;
}
@@ -926,10 +943,22 @@
/**
* Setup the subsystem.
+ *
+ * @param cfg configuration to use
*/
void
-GSF_pending_request_init_ ()
+GSF_pending_request_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
{
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (cfg,
+ "fs",
+ "MAX_PENDING_REQUESTS",
+ &max_pending_requests))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Configuration fails to specify `%s', assuming default
value."),
+ "MAX_PENDING_REQUESTS");
+ }
pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
requests_by_expiration_heap = GNUNET_CONTAINER_heap_create
(GNUNET_CONTAINER_HEAP_ORDER_MIN);
}
Modified: gnunet/src/fs/gnunet-service-fs_pr.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.h 2011-03-04 12:09:54 UTC (rev
14590)
+++ gnunet/src/fs/gnunet-service-fs_pr.h 2011-03-04 12:39:07 UTC (rev
14591)
@@ -323,9 +323,11 @@
/**
* Setup the subsystem.
+ *
+ * @param cfg configuration to use
*/
void
-GSF_pending_request_init_ (void);
+GSF_pending_request_init_ (struct GNUNET_CONFIGURATION_Handle *cfg);
/**
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r14591 - gnunet/src/fs,
gnunet <=