Home  |  Linux  | Mysql  | PHP  | XML
From:(Justin Mason) Date:Thu Jan  4 06:34:14 2007
Subject:Re: [PATCH] Advanced tagging and filtering
hey, btw -- something's up with the latest version, at least; when
I apply the patches and make test, it hangs in t/10enq_string as soon
as pickup_queued_job() is called.  SVN trunk works fine, as does
the first patch, but as soon as the second patch is applied this
happens.

Here's the current rev of the combined patch against SVN:
http://taint.org/x/2007/filter.patch

That contains your two patches, my documentation changes, and a "warn" in
the test script to illustrate this hang.

--j.

Malte S. Stretz writes:
> On Wednesday 03 January 2007 16:38 CET Justin Mason wrote:
> > backwards compatibility is not a problem here, so go ahead and
> > add the tag in addition to the hash.
> >
> > I think the best location is just before the hash, e.g.
> >
> >     50.20040909232529941258.TAG.HASH[.PID.RAND]
> >
> > (vs the simple
> >
> >     50.20040909232529941258.HASH[.PID.RAND]
> >
> > when tags are not in use.)
> >
> > > Actually, when people start to filter the files based on their name,
> > > the format must not change anymore in future.  Maybe the filter should
> > > apply to the tag only...
> >
> > Yes, I think that's a good idea... people shouldn't have to worry about
> > the rest of the filename changing.
> 
> Ok, attached is a patch on top of the last one which changes the code as you 
> described it above.  Additionally there's a revamped test.
> 
> While I was hunting another totally stupid bug, I found out that I actually 
> forgot to add the $filter in the second call to pickup_job in wait_for_job.  
> Gave some nasty surprises.  And visit_all_jobs now supports $filter, too.
> 
> The syntax for $filter has changed a bit:  If it is a string, it must be 
> equal the tag, if it is a RE created with qr// it is matched as a RE.
> 
> Can you update the QUEUE DIRECTORY STRUCTURE section for me?  Your English 
> is a lot better than mine :)
> 
> Cheers,
> Malte
> part 3     text/x-diff               7472
> --- DirQueue.pm.patch1	2006-12-30 01:27:31.000000000 +0100
> +++ DirQueue.pm	2007-01-03 21:08:17.000000000 +0100
> @@ -173,8 +173,10 @@
>      $self->{ordered} = 1;
>    }
>    
> -  $self->{tag} ||= hash_string_to_filename($self->gethostname().$$);
> -  $self->{tag_sub} ||= sub { return $self->{tag}; };
> +  $self->{hash} ||= hash_string_to_filename($self->gethostname().$$);
> +  if (defined $self->{tag}) {
> +    $self->{tag_sub} ||= sub { return $self->{tag}; };
> +  }
>    $self->{tag_max_length} ||= 128;
>    if (!defined $self->{tag_warn}) {
>      $self->{tag_warn} = 1;
> @@ -456,9 +458,10 @@
>  
>  Pick up the next job in the queue, so that it can be processed.
>  
> -The parameter C<$filter> can be used to specify a regular expression which
> -is matched against the queued filename.  All files which don't match will be 
> -skipped.
> +The parameter C<$filter> can be used to specify either a string or a regular 
> +expression (with qr//) which is compared (the the first case) or matched 
> +(in the latter case) against the tag part of the queued filename.  All files 
> +which don't match will be skipped.
>  
>  If no job is available for processing, either because the queue is
>  empty or because other worker processes are already working on
> @@ -709,7 +712,7 @@
>      while (time == $qdirlaststat) {
>        Time::HiRes::usleep ($pollintvl);
>        dbg "wait_for_queued_job: spinning until time != stat $qdirlaststat";
> -      my $job = $self->pickup_queued_job();
> +      my $job = $self->pickup_queued_job($filter);
>        if ($job) { return $job; }
>      }
>  
> @@ -739,7 +742,7 @@
>  
>  ###########################################################################
>  
> -=item $job = $dq->visit_all_jobs($visitor, $visitcontext);
> +=item $job = $dq->visit_all_jobs($visitor, $visitcontext, $filter);
>  
>  Visit all the jobs in the queue, in a read-only mode.  Used to list
>  the entire queue.
> @@ -759,15 +762,17 @@
>    'active_host': the hostname on which the job is active
>    'active_pid': the process ID of the process which picked up the job
>  
> +The jobs can be filtered with C<$filter> as in C<pickup_queued_job()>.
> +
>  =cut
>  
>  sub visit_all_jobs {
> -  my ($self, $visitor, $visitcontext) = @_;
> +  my ($self, $visitor, $visitcontext, $filter) = @_;
>  
>    my $pathqueuedir = $self->q_subdir('queue');
>    my $pathactivedir = $self->q_subdir('active');
>  
> -  my $iter = $self->queue_iter_start($pathqueuedir);
> +  my $iter = $self->queue_iter_start($pathqueuedir, $filter);
>  
>    my $nextfile;
>    while (1) {
> @@ -1118,30 +1123,39 @@
>  
>    my @gmt = gmtime ($job->{time_submitted_secs});
>  
> -  # NN.20040718140300MMMM.tag[.rand]
> +  # NN.20040718140300MMMM[.tag].hash[.rand]
>    #
>    # NN = priority, default 50
>    # MMMM = microseconds from Time::HiRes::gettimeofday()
> -  # tag = some base64-ish string, default hash(hostname.$$)
> +  # hash = hash(hostname.$$)
> +  # tag = some base64-ish string
>    # hostname = current hostname
>  
> -  my $base = sprintf ("%02d.%04d%02d%02d%02d%02d%02d%06d.",
> +  my $file = sprintf ("%02d.%04d%02d%02d%02d%02d%02d%06d",
>          $job->{pri},
>          $gmt[5]+1900, $gmt[4]+1, $gmt[3], $gmt[2], $gmt[1], $gmt[0],
>          $job->{time_submitted_msecs});
>  
> +  # add tag (including leading dot, only if wanted) and hash
> +  $file .= $self->get_q_filename_tag($job).".".$self->{hash};
> +
>    # normally, this isn't used.  but if there's a collision,
>    # all retries after that will do this; in this case, the
>    # extra anti-collision stuff is useful
> -  my $extra = $addextra ? ".".$$.".".$self->get_random_int() : "";
> +  if ($addextra) {
> +    $file .= ".".$$.".".$self->get_random_int();
> +  }
>  
> -  return $base.$self->get_q_filename_tag($job, $base, $extra).$extra;
> +  return $file;
>  }
>  
>  sub get_q_filename_tag {
> -  my($self, $job, $base, $extra) = @_;
> -  # create a (new?) tag
> -  my $str = $self->{tag_sub}->($job, $base, $extra) || '';
> +  my($self, $job) = @_;
> +  # return an empty string if there's nothing to do
> +  return '' unless defined $self->{tag_sub};
> +  # create a (new?) tag, possibly empty
> +  my $str = $self->{tag_sub}->($job);
> +  return '' unless defined $str;
>    # weed out all dangerous chars
>    my $tag = filter_unsafe_chars($str);
>    # limit the length
> @@ -1150,7 +1164,7 @@
>    if ($self->{tag_warn} && $tag ne $str) {
>      warn "IPC::DirQueue: the tag was filtered\n";
>    }
> -  return $tag;
> +  return ".$tag";
>  }
>  
>  sub hash_string_to_filename {
> @@ -1242,23 +1256,37 @@
>  
>  sub queue_iter_start {
>    my ($self, $pathqueuedir, $filter, $type) = @_;
> -  
> -  $filter ||= qr/^/;
> +
> +  $filter = qr// unless defined $filter;
>    dbg ("queue iter: filter $filter in $pathqueuedir");
>    unless (ref $filter eq 'CODE') {
>      # we need to copy $filter here else the closure will get annoyed
> -    my $f = $filter;
> +    my $re = $filter;
> +    unless (ref ($re) eq 'Regexp') {
> +      $re = quotemeta($re);
> +      $re = qr/^${re}$/;
> +    }
>      $filter = sub {
> -                if (wantarray) { # grep is picky about list context
> -                  return grep { /^\d/ && /$f/ } @_;
> -                }
> -                else {
> -                  $_ = shift;
> -                  return unless defined;
> -                  return unless /^\d/;
> -                  return unless /$f/;
> -                  return $_;
> +                # we can't use grep because it is picky about 
> +                # list context and this sub can be called for
> +                # single files and grep would return the count
> +                # in those cases
> +                my @r;
> +                while (@_) {
> +                  my $f = shift;
> +                  next unless defined $f;
> +                  next unless $f =~ /^\d/;
> +                  # we've got to go split here to make the ^
> +                  # and $ anchors work 
> +                  my @f = split(/\./, $f);
> +                  # does this file have a tag (ie. even number
> +                  # of elements)?
> +                  next if scalar (@f) % 2;
> +                  # apply the tag
> +                  next unless $f[2] =~ $re;
> +                  push(@r, $f);
>                  }
> +                return wantarray ? @r : $r[0];
>                };
>    }
>    
> @@ -1358,11 +1386,11 @@
>      @files = sort $iter->{filter}->(readdir(DIR));
>      closedir DIR;
>    }
> -  
> +
>    if (scalar @files <= 0) {
>      return if $self->queuedir_is_bad($iter->{dir});
>    }
> -  
> +
>    $iter->{type} = 'files';
>    $iter->{files} = \@files;
>    return $iter;
> @@ -1555,8 +1583,7 @@
>  
>  The filename format is as follows:
>  
> -    50.20040909232529941258.TAG[.PID.RAND]
> -    |        base          |tag | extra |
> +    50.20040909232529941258[.TAG].HASH[.PID.RAND]
>  
>  The first two digits (C<50>) are the priority of the job.  Lower priority
>  numbers are run first.  C<20040909232529> is the current date and time when the
> @@ -1568,10 +1595,8 @@
>  possible to set this manually with the C<tag> parameter of the constructor, or even
>  to have it generated dynamically by the routine set with C<tag_sub>.
>  
> -The sub routine is called with the parameters C<$job>, C<$base> and C<$extra>.  The
> -first is the job currently enqueued and thus allows access to the metadata.  The 
> -latter two parameters correspond to the already known parts of the filename as
> -shown above (including the dots).
> +The sub routine is called with a single parameter C<$job>.  This is the job 
> +currently enqueued and thus allows access to the metadata.
>  
>  Note that only characters from the set [A-Za-z0-9+_] are allowed and the length
>  of the string is limited to 128 characters.  If any of these restrictions are
Navigate in group perl.ipc.dirqueue at sever nntp.perl.org
Previous Next




  
© No Copyright
You are free to use Anything
Site Maintained by PHP Developer
Powered By PHP Consultants