Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lint(luacheck): limit the maximum length of Lua code to 100. #1525

Merged
merged 1 commit into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .luacheckrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
std = "ngx_lua"
unused_args = false
redefined = false
max_line_length = 100
53 changes: 30 additions & 23 deletions apisix/utils/batch-processor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,64 +42,70 @@ local schema = {
}


local function schedule_func_exec(batch_processor, delay, batch)
local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
local function schedule_func_exec(self, delay, batch)
local hdl, err = timer_at(delay, execute_func, self, batch)
if not hdl then
core.log.error("failed to create process timer: ", err)
return
end
end


function execute_func(premature, batch_processor, batch)
function execute_func(premature, self, batch)
if premature then
return
end

local ok, err = batch_processor.func(batch.entries, batch_processor.batch_max_size)
local ok, err = self.func(batch.entries, self.batch_max_size)
if not ok then
core.log.error("Batch Processor[", batch_processor.name, "] failed to process entries: ", err)
core.log.error("Batch Processor[", self.name,
"] failed to process entries: ", err)
batch.retry_count = batch.retry_count + 1
if batch.retry_count <= batch_processor.max_retry_count then
schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
if batch.retry_count <= self.max_retry_count then
schedule_func_exec(self, self.retry_delay,
batch)
else
core.log.error("Batch Processor[", batch_processor.name,"] exceeded ",
"the max_retry_count[", batch.retry_count,"] dropping the entries")
core.log.error("Batch Processor[", self.name,"] exceeded ",
"the max_retry_count[", batch.retry_count,
"] dropping the entries")
end
return
end

core.log.debug("Batch Processor[", batch_processor.name ,"] successfully processed the entries")
core.log.debug("Batch Processor[", self.name,
"] successfully processed the entries")
end


local function flush_buffer(premature, batch_processor)
local function flush_buffer(premature, self)
if premature then
return
end

if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
core.log.debug("Batch Processor[", batch_processor.name ,"] buffer ",
if now() - self.last_entry_t >= self.inactive_timeout or
now() - self.first_entry_t >= self.buffer_duration
then
core.log.debug("Batch Processor[", self.name ,"] buffer ",
"duration exceeded, activating buffer flush")
batch_processor:process_buffer()
batch_processor.is_timer_running = false
self:process_buffer()
self.is_timer_running = false
return
end

-- buffer duration did not exceed or the buffer is active, extending the timer
core.log.debug("Batch Processor[", batch_processor.name ,"] extending buffer timer")
create_buffer_timer(batch_processor)
-- buffer duration did not exceed or the buffer is active,
-- extending the timer
core.log.debug("Batch Processor[", self.name ,"] extending buffer timer")
create_buffer_timer(self)
end


function create_buffer_timer(batch_processor)
local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
function create_buffer_timer(self)
local hdl, err = timer_at(self.inactive_timeout, flush_buffer, self)
if not hdl then
core.log.error("failed to create buffer timer: ", err)
return
end
batch_processor.is_timer_running = true
self.is_timer_running = true
end


Expand Down Expand Up @@ -149,7 +155,8 @@ function Batch_Processor:push(entry)
self.last_entry_t = now()

if self.batch_max_size <= #entries then
core.log.debug("Batch Processor[", self.name ,"] batch max size has exceeded")
core.log.debug("Batch Processor[", self.name ,
"] batch max size has exceeded")
self:process_buffer()
end

Expand Down