gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r14591 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r14591 - gnunet/src/fs
Date: Fri, 4 Mar 2011 13:39:07 +0100

Author: grothoff
Date: 2011-03-04 13:39:07 +0100 (Fri, 04 Mar 2011)
New Revision: 14591

Modified:
   gnunet/src/fs/gnunet-service-fs.h
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_pr.h
Log:
fixes

Modified: gnunet/src/fs/gnunet-service-fs.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs.h   2011-03-04 12:09:54 UTC (rev 14590)
+++ gnunet/src/fs/gnunet-service-fs.h   2011-03-04 12:39:07 UTC (rev 14591)
@@ -99,6 +99,13 @@
 extern unsigned int GSF_cover_query_count;
 
 
+/**
+ * Our block context.
+ */
+extern struct GNUNET_BLOCK_Context *GSF_block_ctx;
 
+
+
+
 #endif
 /* end of gnunet-service-fs.h */

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2011-03-04 12:09:54 UTC (rev 
14590)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2011-03-04 12:39:07 UTC (rev 
14591)
@@ -24,6 +24,8 @@
  * @author Christian Grothoff
  */
 #include "platform.h"
+#include "gnunet_load_lib.h"
+#include "gnunet-service-fs_cp.h"
 #include "gnunet-service-fs_pr.h"
 
 
@@ -111,6 +113,14 @@
 
 
 /**
+ * Maximum number of requests (from other peers, overall) that we're
+ * willing to have pending at any given point in time.  Can be changed
+ * via the configuration file (32k is just the default).
+ */
+static unsigned long long max_pending_requests = (32 * 1024);
+
+
+/**
  * How many bytes should a bloomfilter be if we have already seen
  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
  * of bits set per entry.  Furthermore, we should not re-size the
@@ -157,8 +167,8 @@
   size_t nsize;
   GNUNET_HashCode mhash;
 
-  nsize = compute_bloomfilter_size (pr->replies_seen_off);
-  if ( (bf != NULL) &&
+  nsize = compute_bloomfilter_size (pr->replies_seen_count);
+  if ( (pr->bf != NULL) &&
        (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
     return GNUNET_NO; /* size not changed */
   if (pr->bf != NULL)
@@ -221,7 +231,7 @@
   
   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
   pr->public_data.query = *query;
-  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
+  if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
     {
       GNUNET_assert (NULL != namespace);
       pr->public_data.namespace = *namespace;
@@ -229,9 +239,9 @@
   if (NULL != target)
     {
       pr->public_data.target = *target;
-      pr->has_target = GNUNET_YES;
+      pr->public_data.has_target = GNUNET_YES;
     }
-  pr->public_data.anonymity_level = anonymity_data;
+  pr->public_data.anonymity_level = anonymity_level;
   pr->public_data.priority = priority;
   pr->public_data.original_priority = priority;
   pr->public_data.options = options;
@@ -240,19 +250,19 @@
   pr->rh = rh;
   pr->rh_cls = rh_cls;
   if (ttl >= 0)
-    pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS,
-                                                                              
(uint32_t) ttl));
+    pr->public_data.ttl = GNUNET_TIME_relative_to_absolute 
(GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                                                               
           (uint32_t) ttl));
   else
-    pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
-                                            GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS,
-                                                                           
(uint32_t) (- ttl)));
+    pr->public_data.ttl = GNUNET_TIME_absolute_subtract 
(pr->public_data.start_time,
+                                                        
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                                                               
        (uint32_t) (- ttl)));
   if (replies_seen_count > 0)
     {
       pr->replies_seen_size = replies_seen_count;
       pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * 
pr->replies_seen_size);
       memcpy (pr->replies_seen,
              replies_seen,
-             replies_seen_count * sizeof (struct GNUNET_HashCode));
+             replies_seen_count * sizeof (GNUNET_HashCode));
       pr->replies_seen_count = replies_seen_count;
     }
   if (NULL != bf_data)    
@@ -275,7 +285,7 @@
     {
       pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
                                                pr,
-                                               pr->ttl.abs_value);
+                                               pr->public_data.ttl.abs_value);
       /* make sure we don't track too many requests */
       while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > 
max_pending_requests)
        {
@@ -326,7 +336,7 @@
 
   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
     return; /* integer overflow */
-  if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
+  if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
     {
       /* we're responsible for the BF, full refresh */
       if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
@@ -336,7 +346,7 @@
       memcpy (&pr->replies_seen[pr->replies_seen_count],
              replies_seen,
              sizeof (GNUNET_HashCode) * replies_seen_count);
-      pr->replies_seen_count += replies_seen;
+      pr->replies_seen_count += replies_seen_count;
       if (GNUNET_NO == refresh_bloomfilter (pr))
        {
          /* bf not recalculated, simply extend it with new bits */
@@ -357,8 +367,8 @@
             any bloom-filter, so we need to create one on-the-fly */
          pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
                                                 UINT32_MAX);
-         pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size 
(replies_seen_count),
-                                                     pr->mingle,
+         pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+                                                     compute_bloomfilter_size 
(replies_seen_count),
                                                      BLOOMFILTER_K);
        }
       for (i=0;i<pr->replies_seen_count;i++)
@@ -388,16 +398,16 @@
                                  size_t buf_size,
                                  void *buf)
 {
-  struct PendingMessage *pm;
   char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
   struct GetMessage *gm;
   GNUNET_HashCode *ext;
   size_t msize;
   unsigned int k;
-  int no_route;
   uint32_t bm;
   uint32_t prio;
   size_t bf_size;
+  struct GNUNET_TIME_Absolute now;
+  int64_t ttl;
 
   k = 0;
   bm = 0;
@@ -406,12 +416,12 @@
       bm |= GET_MESSAGE_BIT_RETURN_TO;
       k++;      
     }
-  if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
+  if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
     {
       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
       k++;
     }
-  if (GNUNET_YES == pr->has_target)
+  if (GNUNET_YES == pr->public_data.has_target)
     {
       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
       k++;
@@ -424,7 +434,7 @@
   gm = (struct GetMessage*) lbuf;
   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
   gm->header.size = htons (msize);
-  gm->type = htonl (pr->type);
+  gm->type = htonl (pr->public_data.type);
   if (GNUNET_YES == do_route)
     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                     pr->public_data.priority + 1);
@@ -432,18 +442,24 @@
     prio = 0;
   pr->public_data.priority -= prio;
   gm->priority = htonl (prio);
-  gm->ttl = htonl (pr->ttl);
+  now = GNUNET_TIME_absolute_get ();
+  ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
+  gm->ttl = htonl (ttl / 1000);
   gm->filter_mutator = htonl(pr->mingle); 
   gm->hash_bitmap = htonl (bm);
-  gm->query = pr->query;
+  gm->query = pr->public_data.query;
   ext = (GNUNET_HashCode*) &gm[1];
-  k = 0;
+  k = 0;  
   if (GNUNET_YES != do_route)
-    GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
-  if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
-    memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
-  if (GNUNET_YES == pr->has_target)
-    GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) 
&ext[k++]);
+    GNUNET_PEER_resolve (pr->cp->pid, 
+                        (struct GNUNET_PeerIdentity*) &ext[k++]);
+  if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
+    memcpy (&ext[k++], 
+           &pr->public_data.namespace, 
+           sizeof (GNUNET_HashCode));
+  if (GNUNET_YES == pr->public_data.has_target)
+    GNUNET_PEER_resolve (pr->public_data.target_pid, 
+                        (struct GNUNET_PeerIdentity*) &ext[k++]);
   if (pr->bf != NULL)
     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
                                               (char*) &ext[k],
@@ -582,13 +598,10 @@
 update_request_performance_data (struct ProcessReplyClosure *prq,
                                 struct GSF_PendingRequest *pr)
 {
-  unsigned int i;
-  struct GNUNET_TIME_Relative cur_delay;
-
   if (prq->sender == NULL)
     return;      
   GSF_peer_update_performance_ (prq->sender,
-                               pr->start_time,
+                               pr->public_data.start_time,
                                prq->priority);
 }
                                
@@ -608,12 +621,6 @@
 {
   struct ProcessReplyClosure *prq = cls;
   struct GSF_PendingRequest *pr = value;
-  struct PendingMessage *reply;
-  struct ClientResponseMessage *creply;
-  struct ClientList *cl;
-  struct PutMessage *pm;
-  struct ConnectedPeer *cp;
-  size_t msize;
   GNUNET_HashCode chash;
 
 #if DEBUG_FS
@@ -622,16 +629,17 @@
              (unsigned int) prq->type,
              GNUNET_h2s (key));
 #endif  
-  GNUNET_STATISTICS_update (stats,
+  GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# replies received and matched"),
                            1,
                            GNUNET_NO);
-  prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
+  prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
                                     prq->type,
                                     key,
                                     &pr->bf,
                                     pr->mingle,
-                                    pr->namespace, (pr->namespace != NULL) ? 
sizeof (GNUNET_HashCode) : 0,
+                                    &pr->public_data.namespace, 
+                                    (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) 
? sizeof (GNUNET_HashCode) : 0,
                                     prq->data,
                                     prq->size);
   switch (prq->eval)
@@ -642,15 +650,19 @@
     case GNUNET_BLOCK_EVALUATION_OK_LAST:
       /* short cut: stop processing early, no BF-update, etc. */
       update_request_performance_data (prq, pr);
-      GNUNET_LOAD_update (rt_entry_lifetime,
-                         GNUNET_TIME_absolute_get_duration 
(pr->start_time).rel_value);
+      GNUNET_LOAD_update (GSF_rt_entry_lifetime,
+                         GNUNET_TIME_absolute_get_duration 
(pr->public_data.start_time).rel_value);
       /* pass on to other peers / local clients */
-      pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_NO);
+      pr->rh (pr->rh_cls,            
+             pr,
+             prq->expiration,
+             prq->data, prq->size, 
+             GNUNET_NO);
       /* destroy request, we're done */
       GSF_pending_request_cancel_ (pr);
       return GNUNET_YES;
     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
-      GNUNET_STATISTICS_update (stats,
+      GNUNET_STATISTICS_update (GSF_stats,
                                gettext_noop ("# duplicate replies discarded 
(bloomfilter)"),
                                1,
                                GNUNET_NO);
@@ -686,18 +698,22 @@
                  "Found result for query `%s' in local datastore\n",
                  GNUNET_h2s (key));
 #endif
-      GNUNET_STATISTICS_update (stats,
+      GNUNET_STATISTICS_update (GSF_stats,
                                gettext_noop ("# results found locally"),
                                1,
                                GNUNET_NO);      
     }
   prq->priority += pr->public_data.original_priority;
-  pr->public_data.remaining_priority = 0;
+  pr->public_data.priority = 0;
   pr->public_data.original_priority = 0;
   pr->public_data.results_found++;
   prq->request_found = GNUNET_YES;
   /* finally, pass on to other peer / local client */
-  pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES);
+  pr->rh (pr->rh_cls,
+         pr, 
+         prq->expiration,
+         prq->data, prq->size, 
+         GNUNET_YES);
   return GNUNET_YES;
 }
 
@@ -725,7 +741,7 @@
                      delay.rel_value);
   if (GNUNET_OK == success)
     return;
-  GNUNET_STATISTICS_update (stats,
+  GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# datastore 'put' failures"),
                            1,
                            GNUNET_NO);
@@ -750,7 +766,7 @@
   ld = GNUNET_LOAD_get_load (datastore_put_load);
   if (ld < 2.0 * (1 + priority))
     return GNUNET_NO;
-  GNUNET_STATISTICS_update (stats,
+  GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# storage requests dropped due to 
high load"),
                            1,
                            GNUNET_NO);
@@ -776,7 +792,7 @@
 void
 GSF_handle_dht_reply_ (void *cls,
                       struct GNUNET_TIME_Absolute exp,
-                      const GNUNET_HashCode * key,
+                      const GNUNET_HashCode *key,
                       const struct GNUNET_PeerIdentity * const *get_path,
                       const struct GNUNET_PeerIdentity * const *put_path,
                       enum GNUNET_BLOCK_Type type,
@@ -785,6 +801,7 @@
 {
   struct GSF_PendingRequest *pr = cls;
   struct ProcessReplyClosure prq;
+  struct GNUNET_TIME_Absolute *start;
 
   memset (&prq, 0, sizeof (prq));
   prq.data = data;
@@ -803,10 +820,10 @@
 #endif
       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
       *start = GNUNET_TIME_absolute_get ();
-      GNUNET_DATASTORE_put (dsh,
-                           0, &query, dsize, &put[1],
+      GNUNET_DATASTORE_put (GSF_dsh,
+                           0, key, size, data,
                            type, prq.priority, 1 /* anonymity */, 
-                           expiration, 
+                           exp, 
                            1 + prq.priority, MAX_DATASTORE_QUEUE,
                            GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                            &put_migration_continuation, 
@@ -856,7 +873,7 @@
   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
     return GNUNET_SYSERR;
   if (GNUNET_OK !=
-      GNUNET_BLOCK_get_key (block_ctx,
+      GNUNET_BLOCK_get_key (GSF_block_ctx,
                            type,
                            &put[1],
                            dsize,
@@ -878,14 +895,14 @@
   prq.anonymity_level = 1;
   prq.finished = GNUNET_NO;
   prq.request_found = GNUNET_NO;
-  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+  GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
                                              &query,
                                              &process_reply,
                                              &prq);
   if (NULL != cp)
     {
-      GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 
* prq.priority);
-      GSF_get_peer_performance_data (cp)->trust += prq.priority;
+      GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 
1000 * prq.priority);
+      GSF_get_peer_performance_data_ (cp)->trust += prq.priority;
     }
   if ( (GNUNET_YES == active_to_migration) &&
        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
@@ -898,7 +915,7 @@
 #endif
       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
       *start = GNUNET_TIME_absolute_get ();
-      GNUNET_DATASTORE_put (dsh,
+      GNUNET_DATASTORE_put (GSF_dsh,
                            0, &query, dsize, &put[1],
                            type, prq.priority, 1 /* anonymity */, 
                            expiration, 
@@ -918,7 +935,7 @@
       block_time = GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MILLISECONDS,
                                                  5000 + 
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                                                
   (unsigned int) (60000 * putl * putl)));
-      GSF_block_peer_migration (cp, block_time);
+      GSF_block_peer_migration_ (cp, block_time);
     }
   return GNUNET_OK;
 }
@@ -926,10 +943,22 @@
 
 /**
  * Setup the subsystem.
+ *
+ * @param cfg configuration to use
  */
 void
-GSF_pending_request_init_ ()
+GSF_pending_request_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
 {
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (cfg,
+                                            "fs",
+                                            "MAX_PENDING_REQUESTS",
+                                            &max_pending_requests))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Configuration fails to specify `%s', assuming default 
value."),
+                 "MAX_PENDING_REQUESTS");
+    }
   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create 
(GNUNET_CONTAINER_HEAP_ORDER_MIN); 
 }

Modified: gnunet/src/fs/gnunet-service-fs_pr.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.h        2011-03-04 12:09:54 UTC (rev 
14590)
+++ gnunet/src/fs/gnunet-service-fs_pr.h        2011-03-04 12:39:07 UTC (rev 
14591)
@@ -323,9 +323,11 @@
 
 /**
  * Setup the subsystem.
+ *
+ * @param cfg configuration to use
  */
 void
-GSF_pending_request_init_ (void);
+GSF_pending_request_init_ (struct GNUNET_CONFIGURATION_Handle *cfg);
 
 
 /**




reply via email to

[Prev in Thread] Current Thread [Next in Thread]