[Xapian-discuss] Multithread problem: Writing to a db disables reading from another one

Robert Pollak robert.pollak at fabasoft.com
Tue Jul 13 12:32:25 BST 2004


Hi to all,

I have Xapian 0.8.1 installed on a single processor RH9 machine with a custom 2.4.20 kernel.
(I used ./configure without parameters.)
I have followed the recent "threaded test" discussion here, and I have tested the examples:

- Richard Boulton's example with one database, one writer, and three readers (1)
works fine, I only see the expected DatabaseModifiedError.
- Eric B. Ridge's example with just two writers to separate databases (2)
also shows no errors (I stopped after 162 docs were added).

I then expanded Richard's example by adding a second writer that writes to a separate database (see attachment).
(I am also reopening the db if necessary.)
Now the queries give empty results!
Commenting out the creation of _writer2 makes the queries return the expected results again (3).

I wonder whether Olly or someone else can reproduce this.
I have not tried modifying any Xapian compile options, because I am an automake newbie.
Would I have to edit Makefile.in (and how whould I do this), or would I just pass arguments to configure?


Footnotes:
1: from 17 Jun 2004 14:25:37 +0100, see <http://article.gmane.org/gmane.comp.search.xapian.general/892>
2: from 17 Jun 2004 00:30:50 +0000, see <http://article.gmane.org/gmane.comp.search.xapian.general/876>
3:
$ ./xapian_threads
Writer opening 1.db
Writer flushing 1.db
Reader opening 1.db
1.db results: 10
Reader opening 1.db
1.db results: 10
Reader opening 1.db
1.db results: 10
Readers started
Writer flushing 1.db
1.db results: 10
1.db results: 10
1.db results: 10
Writer flushing 1.db
Reader reopening 1.db
1.db results: 30
Reader reopening 1.db
1.db results: 30
Reader reopening 1.db
1.db results: 30
1.db results: 30
1.db results: 30
1.db results: 30
Writer flushing 1.db
1.db results: 30
1.db results: 30
1.db results: 30
Reader reopening 1.db
1.db results: 40

-- 
Robert Pollak
GPG Key ID: 748646AD
-------------- next part --------------
#include <xapian.h>
#include <pthread.h>
#include <unistd.h>
#include <iostream>
#include <sstream>

#define MAX_THREADS 3

using namespace std;

pthread_t _writer1, _writer2;
pthread_t _readers[MAX_THREADS];
pthread_mutex_t output_mutex;

void msg(const string& msg)
{
  pthread_mutex_lock(&output_mutex);
  cout << msg << endl;
  pthread_mutex_unlock(&output_mutex);
}

void *do_writes(void *param)
{
  char *dbpath = (char *)param;
	msg(string("Writer opening ") + dbpath);
  unlink((string(dbpath) + "/db_lock").c_str());
  Xapian::WritableDatabase db = Xapian::Auto::open(dbpath, Xapian::DB_CREATE_OR_OVERWRITE);
  char *rnd = (char *)malloc(255);
  memset(rnd, 0, 255);
  long cnt = 0;

  try {
    // endlessly add a 2000 term document to the database
    while(true) {
      Xapian::Document doc;
      int i = 0;

      for(; i < 1000; i++) {
        doc.add_posting("random", i);

        sprintf(rnd, "%d", rand());
        doc.add_posting(rnd, i);
      }

      db.add_document(doc);

      if(++cnt % 10 == 0) {
        // flush every 10 documents
        db.flush();
        msg(string("Writer flushing ") + dbpath);
      }
    }
  }
  catch(Xapian::Error & err) {
    msg(string("Writer accessing ") + dbpath + ":  ERROR=" + err.get_msg());
  }
  msg("Writer thread unexpectedly exited...");
  pthread_exit(NULL);
}

void *do_reads(void *param)
{
  char *dbpath = (char *)param;
  msg(string("Reader opening ") + dbpath);
  try {
    // create the reader database once and continue to query it.
    Xapian::Database db = Xapian::Auto::open(dbpath);

    while(true) {
      Xapian::Enquire enq(db);
      Xapian::Query query("random");
      Xapian::MSet set;
      enq.set_query(query);

      bool dbModified;
      do {
        dbModified = false;
        try {
          set = enq.get_mset(0, 2500);
          // walk the results to exercize the mset iterator
          for(Xapian::MSetIterator i = set.begin(); i != set.end(); ++i);
        }
        catch(const Xapian::DatabaseModifiedError &) {
          dbModified = true;
          msg(string("Reader reopening ") + dbpath);
          db.reopen();
        }
      } while(dbModified);      // repeat only on error
      
      stringstream result;
      result << dbpath << " results: " << set.size();
      msg(result.str());
      sleep(1); // make the log more readable
    }
  }
  catch(Xapian::Error & err) {
    msg(string("Reader accessing ") + dbpath + ":  ERROR=" + err.get_msg());
  }

  pthread_exit(NULL);
}

void setup_writer_threads()
{
  pthread_create(&_writer1, NULL, do_writes, (void *)"1.db");
  pthread_create(&_writer2, NULL, do_writes, (void *)"2.db");
}

void setup_reader_threads()
{
  int i = 0;

  for(; i < MAX_THREADS; i++) {
    int success = pthread_create(&(_readers[i]), NULL, do_reads, (void *)"1.db") == 0;

    if(!success) {
      msg("setup_reader_threads:  Could not create thread");
    }
  }
}

int main(int argv, char **argc)
{
  int i = 0;

  pthread_mutex_init(&output_mutex, NULL);

  setup_writer_threads();
  // let the writer threads get going before we start a bunch of readers
  sleep(2);


  setup_reader_threads();
  msg("Readers started");

  // sit and wait for everything to finish
  // in a perfect world, they'll never finish
  pthread_join(_writer1, NULL);
  pthread_join(_writer2, NULL);
  for(; i < MAX_THREADS; i++) {
    pthread_join(_readers[i], NULL);
  }

  pthread_mutex_destroy(&output_mutex);
  return 0;
}


More information about the Xapian-discuss mailing list