[Date Prev][Date Next]
[Chronological]
[Thread]
[Top]
RE: slurpd replication queue
Hola,
This is from 2.0.19 code so it should be fairly current.  I'm by no means an
expert in this stuff but I hope this helps.
"I could not see any clear_rq method in the main thread."
That is because it is called rq_gc( ).  In fm.c the fm( ) main file manager
routine checks if the slapd replog is not empty and copies the entry to
slurpd's copy of the replog.   It then populates the rq queue :-) (calls
populate_queue which calls rq_add and the replication entries are processed
by the threads (I think ldap_pvt_thread_yield(); is the one that gets the
worker thread going...).
Once the worker thread does it's thing we call rq_gc(sglob->rq) which does
garbage collection in the queue.  It clears the queue link list of all
elements (calls rq->rq_delhead( rq ) until head is NULL) because it asumes
that the new work has already been copied to the replog file and the worker
thread has done it's thing with the replication entries.  
rq_needtrim trims the replication file (rq_write is called to trim it) if
needed and then we keep waiting for something to show up in the replogfile
and the process starts again.
void *
fm(
    void *arg
)
{
    int rc;
<...stuff removed...>
    while ( !sglob->slurpd_shutdown ) {
        if ( file_nonempty( sglob->slapd_replogfile )) {
            /* New work found - copy to slurpd replog file */
            Debug( LDAP_DEBUG_ARGS, "new work in %s\n",
                    sglob->slapd_replogfile, 0, 0 );
            if (( rc = copy_replog( sglob->slapd_replogfile,
                    sglob->slurpd_replogfile )) == 0 )  {
                populate_queue( sglob->slurpd_replogfile );
            } else {
                if ( rc < 0 ) {
                    Debug( LDAP_DEBUG_ANY,
                            "Fatal error while copying replication log\n",
                            0, 0, 0 );
                    sglob->slurpd_shutdown = 1;
                }
            }
        } else {
            ldap_pvt_thread_sleep( sglob->no_work_interval );
        }
        /* Garbage-collect queue */
        sglob->rq->rq_gc( sglob->rq );
        /* Trim replication log file, if needed */
        if ( sglob->rq->rq_needtrim( sglob->rq )) {
            FILE *fp, *lfp;
            if (( rc = acquire_lock( sglob->slurpd_replogfile, &fp,
                    &lfp )) < 0 ) {
                Debug( LDAP_DEBUG_ANY,
                        "Error: cannot acquire lock on \"%s\" for
trimming\n",
                        sglob->slurpd_replogfile, 0, 0 );
            } else {
                sglob->rq->rq_write( sglob->rq, fp );
                (void) relinquish_lock( sglob->slurpd_replogfile, fp, lfp );
            }
        }
    }
    Debug( LDAP_DEBUG_ARGS, "fm: exiting\n", 0, 0, 0 );
    return NULL;
}
In rq.c rq_gc is called to perform garbage collection in the queue:
/*
 * Garbage-collect the replication queue.  Locking is handled internally.
 */
static void
Rq_gc(
    Rq  *rq
)
{
    if ( rq == NULL ) {
        Debug( LDAP_DEBUG_ANY, "Rq_gc: rq is NULL!\n", 0, 0, 0 );
        return;
    }
    rq->rq_lock( rq );
    while (( rq->rq_head != NULL ) &&
            ( rq->rq_head->re_getrefcnt( rq->rq_head ) == 0 )) {
        rq->rq_delhead( rq );
        rq->rq_ndel++;  /* increment count of deleted entries */
    }
    rq->rq_unlock( rq );
    return;
}
In rq.c rq_write is where the replication file is trimmed (notice that
rq->rq_lasttrim = now resets the last trim time to now):
Rq_write(
    Rq          *rq,
    FILE        *fp
)
{
    Re          *re;
    time_t      now;
    if ( rq == NULL ) {
        return -1;
    }
    Debug( LDAP_DEBUG_ARGS, "re-write on-disk replication log\n",
            0, 0, 0 );
#ifndef SEEK_SET
#define SEEK_SET 0
#endif
    fseek( fp, 0L, SEEK_SET );  /* Go to beginning of file */
    rq->rq_lock( rq );
    for ( re = rq->rq_gethead( rq ); re != NULL; re = rq->rq_getnext( re ))
{
        if ( re->re_write( NULL, re, fp ) < 0 ) {
            fflush( fp );
            rq->rq_unlock( rq );
            return -1;
        }
    }
    fflush( fp );
    sglob->srpos = ftell( fp ); /* update replog file position */
    /* and truncate to correct len */
    if ( ftruncate( fileno( fp ), sglob->srpos ) < 0 ) {
        Debug( LDAP_DEBUG_ANY, "Error truncating replication log: %s\n",
                sys_errlist[ errno ], 0, 0 );
    }
    rq->rq_ndel = 0;    /* reset count of deleted re's */
    time( &now );
    rq->rq_lasttrim = now;      /* reset last trim time */
    rq->rq_unlock( rq );
    return 0;
}
Tomás V. Arredondo
Member of Technical Staff
Unisphere Networks
7800 Congress Ave., Suite 100, Boca Raton, Fl 33487
Tel.(561)981-7051, Fax.(561)981-7001
tarredondo@unispherenetworks.com
-----Original Message-----
From: Girish Dharmapurikar [ mailto:girishd@dh.cit.alcatel.fr
<mailto:girishd@dh.cit.alcatel.fr> ]
Sent: Friday, March 15, 2002 8:50 AM
To: Arredondo, Tomas
Subject: slurpd replication queue
Hi Tomás,
Ref. to your posting
http://www.openldap.org/lists/openldap-software/200106/msg00452.html
<http://www.openldap.org/lists/openldap-software/200106/msg00452.html> 
I have a small question, if you still remember the subject. I have a doubt
regarding the flushing of the 'rq' stack.
As I understand the OpenLDAP source, there is a 'rq' stack having 're'
entries.
The 'rq' is filled by the main thread. For each replica there is one thread
initiated by the main thread. The filled 'rq' is accessable to all threads
for
the replication. But I fail to understand where is this 'rq' stack
cleared/flushed at the end. I could not see any clear_rq method in the main
thread.
Can you please help.
Thanks in advance,
Girish