search+present functional for multi filter
authorAdam Dickmeiss <adam@indexdata.dk>
Mon, 16 Jan 2006 01:10:19 +0000 (01:10 +0000)
committerAdam Dickmeiss <adam@indexdata.dk>
Mon, 16 Jan 2006 01:10:19 +0000 (01:10 +0000)
src/filter_multi.cpp
src/filter_multi.hpp
src/filter_virt_db.cpp

index 09f5f2f..71343c6 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: filter_multi.cpp,v 1.1 2006-01-15 20:03:14 adam Exp $
+/* $Id: filter_multi.cpp,v 1.2 2006-01-16 01:10:19 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
@@ -31,12 +31,20 @@ namespace yp2 {
 
         struct Multi::BackendSet {
             BackendPtr m_backend;
-            long size;
+            int m_count;
+            bool operator < (const BackendSet &k) const;
         };
-        struct Multi::Set {
-            Set(std::string setname);
-            Set();
-            ~Set();
+        struct Multi::FrontendSet {
+            struct PresentJob {
+                BackendPtr m_backend;
+                int m_pos;
+                int m_inside_pos;
+            };
+            FrontendSet(std::string setname);
+            FrontendSet();
+            ~FrontendSet();
+
+            void round_robin(int pos, int number, std::list<PresentJob> &job);
 
             std::list<BackendSet> m_backend_sets;
             std::string m_setname;
@@ -55,15 +63,13 @@ namespace yp2 {
             bool m_is_multi;
             bool m_in_use;
             std::list<BackendPtr> m_backend_list;
-            std::map<std::string,Multi::Set> m_sets;
-            void multi_move();
+            std::map<std::string,Multi::FrontendSet> m_sets;
+
+            void multi_move(std::list<BackendPtr> &blist);
             void init(Package &package, Z_GDU *gdu);
             void close(Package &package);
             void search(Package &package, Z_APDU *apdu);
-#if 0
             void present(Package &package, Z_APDU *apdu);
-            void scan(Package &package, Z_APDU *apdu);
-#endif
             Rep *m_p;
         };            
         struct Multi::Map {
@@ -91,6 +97,11 @@ namespace yp2 {
 
 using namespace yp2;
 
+bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
+{
+    return m_count < k.m_count;
+}
+
 yf::Multi::Frontend::Frontend(Rep *rep)
 {
     m_p = rep;
@@ -147,18 +158,18 @@ void yf::Multi::Rep::release_frontend(Package &package)
     }
 }
 
-yf::Multi::Set::Set(std::string setname)
+yf::Multi::FrontendSet::FrontendSet(std::string setname)
     :  m_setname(setname)
 {
 }
 
 
-yf::Multi::Set::Set()
+yf::Multi::FrontendSet::FrontendSet()
 {
 }
 
 
-yf::Multi::Set::~Set()
+yf::Multi::FrontendSet::~FrontendSet()
 {
 }
 
@@ -205,17 +216,70 @@ void yf::Multi::Frontend::close(Package &package)
     }
 }
 
-void yf::Multi::Frontend::multi_move()
+void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
 {
     std::list<BackendPtr>::const_iterator bit;
     boost::thread_group g;
-    for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
+    for (bit = blist.begin(); bit != blist.end(); bit++)
     {
         g.add_thread(new boost::thread(**bit));
     }
     g.join_all();
 }
 
+
+void yf::Multi::FrontendSet::round_robin(int start, int number,
+                                         std::list<PresentJob> &jobs)
+{
+    int fetched = 0;
+    int p = 1;
+    bool eof = true;
+
+    std::list<int> pos;
+    std::list<int> inside_pos;
+    std::list<BackendSet>::const_iterator bsit;
+    for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
+    {
+        pos.push_back(1);
+        inside_pos.push_back(0);
+    }
+
+    std::list<int>::iterator psit = pos.begin();
+    std::list<int>::iterator esit = inside_pos.begin();
+    bsit = m_backend_sets.begin();
+    while (fetched < number)
+    {
+        if (bsit == m_backend_sets.end())
+        {
+            psit = pos.begin();
+            esit = inside_pos.begin();
+            bsit = m_backend_sets.begin();
+            if (eof)
+                break;
+            eof = true;
+        }
+        if (*psit <= bsit->m_count)
+        {
+            if (p >= start)
+            {
+                PresentJob job;
+                job.m_backend = bsit->m_backend;
+                job.m_pos = *psit;
+                job.m_inside_pos = *esit;
+                jobs.push_back(job);
+                (*esit)++;
+                fetched++;
+            }
+            (*psit)++;
+            p++;
+            eof = false;
+        }
+        psit++;
+        esit++;
+        bsit++;
+    }
+}
+
 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
 {
     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
@@ -275,7 +339,7 @@ void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
 
         b->m_package->copy_filter(package);
     }
-    multi_move();
+    multi_move(m_backend_list);
 
     // create the frontend init response based on each backend init response
     yp2::odr odr;
@@ -350,10 +414,13 @@ void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
         p->request() = apdu_req;
         p->copy_filter(package);
     }
-    multi_move();
+    multi_move(m_backend_list);
 
     // look at each response
-    int total_hits = 0;
+    FrontendSet resultSet(std::string(req->resultSetName));
+
+    int total_count = 0;
+    Z_Records *z_records_diag = 0;  // no diagnostics (yet)
     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
     {
         PackagePtr p = (*bit)->m_package;
@@ -367,8 +434,20 @@ void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
         {
             Z_APDU *b_apdu = gdu->u.z3950;
             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
-            
-            total_hits += *b_resp->resultCount;
+         
+            // see we get any errors (AKA diagnstics)
+            if (b_resp->records)
+            {
+                if (b_resp->records->which == Z_Records_NSD
+                    || b_resp->records->which == Z_Records_multipleNSD)
+                    z_records_diag = b_resp->records;
+                // we may set this multiple times (TOO BAD!)
+            }
+            BackendSet backendSet;
+            backendSet.m_backend = *bit;
+            backendSet.m_count = *b_resp->resultCount;
+            total_count += *b_resp->resultCount;
+            resultSet.m_backend_sets.push_back(backendSet);
         }
         else
         {
@@ -382,8 +461,150 @@ void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
 
-    *f_resp->resultCount = total_hits;
+    if (z_records_diag)
+    {
+        // search error
+        f_resp->records = z_records_diag;
+    }
+    else
+    {   // assume OK
+        m_sets[resultSet.m_setname] = resultSet;
+    }
+    *f_resp->resultCount = total_count;
+    
+    package.response() = f_apdu;
+}
 
+void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
+{
+    // create present request 
+    Z_PresentRequest *req = apdu_req->u.presentRequest;
+
+    Sets_it it;
+    it = m_sets.find(std::string(req->resultSetId));
+    if (it == m_sets.end())
+    {
+        yp2::odr odr;
+        Z_APDU *apdu = 
+            odr.create_presentResponse(
+                apdu_req,
+                YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
+                req->resultSetId);
+        package.response() = apdu;
+        return;
+    }
+    std::list<Multi::FrontendSet::PresentJob> jobs;
+    int start = *req->resultSetStartPoint;
+    int number = *req->numberOfRecordsRequested;
+    it->second.round_robin(start, number, jobs);
+
+    std::list<BackendPtr> present_backend_list;
+
+    std::list<BackendSet>::const_iterator bsit;
+    bsit = it->second.m_backend_sets.begin();
+    for (; bsit != it->second.m_backend_sets.end(); bsit++)
+    {
+        std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
+        int start = -1;
+        int end = -1;
+        
+        for (jit = jobs.begin(); jit != jobs.end(); jit++)
+        {
+            if (jit->m_backend == bsit->m_backend)
+            {
+                if (start == -1 || jit->m_pos < start)
+                    start = jit->m_pos;
+                if (end == -1 || jit->m_pos > end)
+                    end = jit->m_pos;
+            }
+        }
+        if (start != -1)
+        {
+            PackagePtr p = bsit->m_backend->m_package;
+
+            *req->resultSetStartPoint = start;
+            *req->numberOfRecordsRequested = end - start + 1;
+            
+            p->request() = apdu_req;
+            p->copy_filter(package);
+
+            present_backend_list.push_back(bsit->m_backend);
+        }
+    }
+    multi_move(present_backend_list);
+
+    // look at each response
+    Z_Records *z_records_diag = 0;
+
+    std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
+    for (; pbit != present_backend_list.end(); pbit++)
+    {
+        PackagePtr p = (*pbit)->m_package;
+        
+        if (p->session().is_closed()) // if any backend closes, close frontend
+            package.session().close();
+        
+        Z_GDU *gdu = p->response().get();
+        if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
+            Z_APDU_presentResponse)
+        {
+            Z_APDU *b_apdu = gdu->u.z3950;
+            Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
+         
+            // see we get any errors (AKA diagnstics)
+            if (b_resp->records)
+            {
+                if (b_resp->records->which != Z_Records_DBOSD)
+                    z_records_diag = b_resp->records;
+                // we may set this multiple times (TOO BAD!)
+            }
+        }
+        else
+        {
+            // if any target does not return present response - return that 
+            package.response() = p->response();
+            return;
+        }
+    }
+
+    yp2::odr odr;
+    Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
+    Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
+
+    if (z_records_diag)
+    {
+        f_resp->records = z_records_diag;
+        *f_resp->presentStatus = Z_PresentStatus_failure;
+    }
+    else
+    {
+        f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
+        Z_Records * records = f_resp->records;
+        records->which = Z_Records_DBOSD;
+        records->u.databaseOrSurDiagnostics =
+            (Z_NamePlusRecordList *)
+            odr_malloc(odr, sizeof(Z_NamePlusRecordList));
+        Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
+        nprl->num_records = jobs.size();
+        nprl->records = (Z_NamePlusRecord**)
+            odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
+        int i = 0;
+        std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
+        for (jit = jobs.begin(); jit != jobs.end(); jit++)
+        {
+            PackagePtr p = jit->m_backend->m_package;
+            
+            Z_GDU *gdu = p->response().get();
+            Z_APDU *b_apdu = gdu->u.z3950;
+            Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
+
+            nprl->records[i++] =
+                b_resp->records->u.databaseOrSurDiagnostics->
+                records[jit->m_inside_pos];
+        }
+        *f_resp->nextResultSetPosition = start + i;
+        *f_resp->numberOfRecordsReturned = i;
+    }
     package.response() = f_apdu;
 }
 
@@ -418,6 +639,10 @@ void yf::Multi::process(Package &package) const
         {
             f->search(package, apdu);
         }
+        else if (apdu->which == Z_APDU_presentRequest)
+        {
+            f->present(package, apdu);
+        }
         else
         {
             yp2::odr odr;
index 98d7210..fc6eda3 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: filter_multi.hpp,v 1.1 2006-01-15 20:03:14 adam Exp $
+/* $Id: filter_multi.hpp,v 1.2 2006-01-16 01:10:19 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
@@ -21,13 +21,13 @@ namespace yp2 {
             class Rep;
             class Frontend;
             class Map;
-            class Set;
+            class FrontendSet;
             class Backend;
             class BackendSet;
             typedef boost::shared_ptr<Backend> BackendPtr;
             typedef boost::shared_ptr<Frontend> FrontendPtr;
             typedef boost::shared_ptr<Package> PackagePtr;
-            typedef std::map<std::string,Set>::iterator Sets_it;
+            typedef std::map<std::string,FrontendSet>::iterator Sets_it;
         public:
             ~Multi();
             Multi();
index dbd0fc3..fa2e5a1 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: filter_virt_db.cpp,v 1.24 2006-01-14 08:38:57 adam Exp $
+/* $Id: filter_virt_db.cpp,v 1.25 2006-01-16 01:10:19 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
@@ -443,7 +443,6 @@ void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu_req)
                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
                 resultSetId.c_str());
         package.response() = apdu;
-        
         return;
     }
     Session *id =