-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathworker_ex.pl
67 lines (64 loc) · 1.39 KB
/
worker_ex.pl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/evn perl
use 5.010;
use strict;
use lib::abs '../../../libs/*/lib','../../../libs/*/blib/lib','../../../libs/*/blib/arch';
use EV;
use EV::Tarantool;
use JSON::XS;
use Scalar::Util 'weaken';
use DDP;
our $JSON = JSON::XS->new->utf8;
# use for testing latency:
# lua -tonumber64(jobs.task('any',tostring(box.time64()),1)) + box.time64()
my $tnt = EV::Tarantool->new({
timeout => 10,
host => 0,
port => 33013,
connected => sub {
my $c = shift;
$c->lua('jobs.worker',[],{out => 'p'},sub {
if (my $res = shift) {
my $wid = $res->{tuples}[0][0];
say "Registered as worker id $wid";
my $timeout = 1;
my $loop;$loop = sub {
$c->lua('jobs.work',[ $wid,$timeout ], sub {
if (my $t = shift) {
if ($t->{count} > 0) {
# p $t->{tuples};
for my $task ( @{ $t->{tuples} } ) {
my ($tid,@data) = @$task;
say "got task $tid";
# ...
#my $j;$j = EV::timer 1,0,sub { undef $j;
$c->lua('jobs.done',[$tid,@data], sub {
shift or warn "@_";
});
#};
}
}
$loop->();
}
else {
p @_;
return;
}
});
};
$loop->();
my $w;$w = EV::timer $timeout/2,0, sub { undef $w; $loop && $loop->(); };
weaken($loop);
}
else {
p @_;
}
});
return;
},
disconnected => sub {
shift;
warn "@_";
},
});
$tnt->connect;
EV::loop;