高負荷のTarantoolアプリケーションをゼロから構築する

画像



2013 Mail.ru Group, , . , , . , — Tarantool. , , , .



Tarantool — — , . , C, Tarantool, … 250 , , .



Tarantool. , .



, Tarantool , , . , Tarantool, 2.2.



, Beanstalkd. , ( ), . - .



: , ; : , ( put); , ( take).





. put ready. take taken. taken (ack) , ready (release).





:







Tarantool — , , LuaJIT-. , init.lua, , box.cfg(), .



. :



require'strict'.on()

box.cfg{}

require'console'.start()
os.exit()


, - . , . 10-15



strict. Lua , . , Tarantool DEBUG, strict .



tarantool:



tarantool init.lua


- :



2020-07-09 20:00:11.344 [30043] main/102/init.lua C> Tarantool 2.2.3-1-g98ecc909a
2020-07-09 20:00:11.345 [30043] main/102/init.lua C> log level 5
2020-07-09 20:00:11.346 [30043] main/102/init.lua I> mapping 268435456 bytes for memtx tuple arena...
2020-07-09 20:00:11.347 [30043] main/102/init.lua I> mapping 134217728 bytes for vinyl tuple arena...
2020-07-09 20:00:11.370 [30043] main/102/init.lua I> instance uuid 38c59892-263e-42de-875c-8f67539191a3
2020-07-09 20:00:11.371 [30043] main/102/init.lua I> initializing an empty data directory
2020-07-09 20:00:11.408 [30043] main/102/init.lua I> assigned id 1 to replica 38c59892-263e-42de-875c-8f67539191a3
2020-07-09 20:00:11.408 [30043] main/102/init.lua I> cluster uuid 7723bdf4-24e8-4957-bd6c-6ab502a1911c
2020-07-09 20:00:11.425 [30043] snapshot/101/main I> saving snapshot `./00000000000000000000.snap.inprogress'
2020-07-09 20:00:11.437 [30043] snapshot/101/main I> done
2020-07-09 20:00:11.439 [30043] main/102/init.lua I> ready to accept requests
2020-07-09 20:00:11.439 [30043] main/104/checkpoint_daemon I> scheduled next checkpoint for Thu Jul  9 21:11:59 2020
tarantool> 




queue.lua . , init.lua, .



queue init.lua:



require'strict'.on()

box.cfg{}

queue = require 'queue'

require'console'.start()
os.exit()


queue.lua.



, - . (space) — . , - . , , , (if_not_exists). Tarantool ( ). .



. id , - . , . , id. , , .



box.schema.create_space('queue',{ if_not_exists = true; })

box.space.queue:format( {
    { name = 'id';     type = 'number' },
    { name = 'status'; type = 'string' },
    { name = 'data';   type = '*'      },
} );

box.space.queue:create_index('primary', {
   parts = { 1,'number' };
   if_not_exists = true;
})


queue, , . : (put) (take).



. . , : . : R=READY T=TAKEN.



local queue = {}

local STATUS = {}
STATUS.READY = 'R'
STATUS.TAKEN = 'T'

function queue.put(...)

end

function queue.take(...)

end

return queue


put? . id READY. , clock.realtime. , ( , , ). , , , . , id, . , , .



:



local clock = require 'clock'
function gen_id()
    local new_id
    repeat
        new_id = clock.realtime64()
    until not box.space.queue:get(new_id)
    return new_id
end

function queue.put(...)
    local id = gen_id()
    return box.space.queue:insert{ id, STATUS.READY, { ... } }
end


put, Tarantool . , , (). . , Tarantool , MessagePack, .



tarantool> queue.put("hello")
---
- [1594325382148311477, 'R', ['hello']]
...

tarantool> queue.put("my","data",1,2,3)
---
- [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]]
...

tarantool> queue.put({ complex = { struct = "data" }})
---
- [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]]
...


, , . , .



tarantool> box.space.queue:select()
---
- - [1594325382148311477, 'R', ['hello']]
  - [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]]
  - [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]]
...


take. . , , READY. , , , . . Tarantool, key-value , , : , , .



, , — . . — id. .



box.space.queue:create_index('status', {
    parts = { 2, 'string', 1, 'number' };
    if_not_exists = true;
})


. , pairs. . , : , . , READY. . - , , . , , take, . , .



function queue.take()
    local found = box.space.queue.index.status
        :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
    if found then
        return box.space.queue
            :update( {found.id}, {{'=', 2, STATUS.TAKEN }})
    end
    return
end


, Tarantool . , , , update, . , . :



local F = {}
for no,def in pairs(box.space.queue:format()) do
    F[no] = def.name
    F[def.name] = no
end


:



box.space.queue:format( {
    { name = 'id';     type = 'number' },
    { name = 'status'; type = 'string' },
    { name = 'data';   type = '*'      },
} );

local F = {}
for no,def in pairs(box.space.queue:format()) do
    F[no] = def.name
    F[def.name] = no
end

box.space.queue:create_index('primary', {
   parts = { F.id, 'number' };
   if_not_exists = true;
})

box.space.queue:create_index('status', {
    parts = { F.status, 'string', F.id, 'number' };
    if_not_exists = true;
})


take :



function queue.take(...)
    for _,t in
        box.space.queue.index.status
        :pairs({ STATUS.READY },{ iterator='EQ' })
    do
        return box.space.queue:update({t.id},{
            { '=', F.status, STATUS.TAKEN }
        })
    end
    return
end


, . take . , box.space.queue:truncate():



tarantool> queue.put("my","data",1,2,3)
---
- [1594325927025602515, 'R', ['my', 'data', 1, 2, 3]]
...

tarantool> queue.take()
---
- [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]]
...

tarantool> queue.take()
---
...


take , . take , , ready- ( R) . , select :



tarantool> box.space.queue:select()
---
- - [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]]
...


, , , , - . - . : ack release. id . , . . , ready.



function queue.ack(id)
    local t = assert(box.space.queue:get{id},"Task not exists")
    if t and t.status == STATUS.TAKEN then
        return box.space.queue:delete{t.id}
    else
        error("Task not taken")
    end
end

function queue.release(id)
    local t = assert(box.space.queue:get{id},"Task not exists")
    if t and t.status == STATUS.TAKEN then
        return box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }})
    else
        error("Task not taken")
    end
end


, . , . R. take . , . take . . , -.



tarantool> queue.put("task 1")
---
- [1594326185712343931, 'R', ['task 1']]
...

tarantool> queue.put("task 2")
---
- [1594326187061434882, 'R', ['task 2']]
...

tarantool> task = queue.take() return task
---
- [1594326185712343931, 'T', ['task 1']]
...

tarantool> queue.release(task.id)
---
- [1594326185712343931, 'R', ['task 1']]
...

tarantool> task = queue.take() return task
---
- [1594326185712343931, 'T', ['task 1']]
...

tarantool> queue.ack(task.id)
---
- [1594326185712343931, 'T', ['task 1']]
...

tarantool> task = queue.take() return task
---
- [1594326187061434882, 'T', ['task 2']]
...

tarantool> queue.ack(task.id)
---
- [1594326187061434882, 'T', ['task 2']]
...

tarantool> task = queue.take() return task
---
- null
...


. , . , , . take, , . , , , , CPU.



while true do
    local task = queue.take()
    if task then
        -- ...
    end
end


, «» ( channel). . , FIFO- . , , . Lua-, - , , .



: N , , . , , - . , . put. , put. , put , put , . . Go, :





take. — : . , . , , .



, . «» , , .



local fiber = require 'fiber'
queue._wait = fiber.channel()
function queue.take(timeout)
    if not timeout then timeout = 0 end
    local now = fiber.time()
    local found
    while not found do
        found = box.space.queue.index.status
            :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
        if not found then
            local left = (now + timeout) - fiber.time()
            if left <= 0 then return end
            queue._wait:get(left)
        end
    end
    return box.space.queue
        :update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
end


: take , , . , . , , .



, init.lua fiber :



fiber = require 'fiber'


, . 0,1 . , 0,1 . . take 3. take . , 3 . , .



tarantool> do
    box.space.queue:truncate()
    fiber.create(function()
        fiber.sleep(0.1)
        queue.put("task 3")
    end)
    local start = fiber.time()
    return queue.take(3), { wait = fiber.time() - start }
end

---
- [1594326905489650533, 'T', ['task 3']]
- wait: 3.0017817020416
...


, take . put . , true.



, put , . , . . . , , , . . , , , .



function queue.put(...)
    local id = gen_id()

    if queue._wait:has_readers() then
        queue._wait:put(true,0)
    end

    return box.space.queue:insert{ id, STATUS.READY, { ... } }
end


take . 0,1 . take . , . , .



tarantool> do
    box.space.queue:truncate()
    fiber.create(function()
        fiber.sleep(0.1)
        queue.put("task 4")
    end)
    local start = fiber.time()
    return queue.take(3), { wait = fiber.time() - start }
end

---
- [1594327004302379957, 'T', ['task 4']]
- wait: 0.10164666175842
...


, . . init.lua box.cfg listen — , . . , , . .



require'strict'.on()
fiber = require 'fiber'

box.cfg{
    listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })

queue = require 'queue'

require'console'.start()
os.exit()


-producer . Tarantool , Tarantool.



#!/usr/bin/env tarantool

if #arg < 1 then
    error("Need arguments",0)
end

local netbox = require 'net.box'
local conn = netbox.connect('127.0.0.1:3301')

local yaml = require 'yaml'
local res = conn:call('queue.put',{unpack(arg)})
print(yaml.encode(res))
conn:close()


$ tarantool producer.lua "hi"
--- [1594327270675788959, 'R', ['hi']]
...


(consumer) , take . , . . , .



#!/usr/bin/env tarantool

local netbox = require 'net.box'
local conn = netbox.connect('127.0.0.1:3301')
local yaml = require 'yaml'

while true do
    local task = conn:call('queue.take', { 1 })

    if task then
        print("Got task: ", yaml.encode(task))
        conn:call('queue.release', { task.id })
    else
        print "No more tasks"
    end
end


- .



$ tarantool consumer.lua 
Got task:
        --- [1594327270675788959, 'T', ['hi']]
...

ER_EXACT_MATCH: Invalid key part count in an exact match (expected 1, got 0)


. , , : . , , , , .



$ tarantool consumer.lua 
No more tasks
No more tasks


select , .



tarantool> box.space.queue:select()
---
- - [1594327004302379957, 'T', ['task 3']]
  - [1594327270675788959, 'T', ['hi']]
...


, .



Tarantool . , .



local log = require 'log'

box.session.on_connect(function()
    log.info( "connected %s from %s", box.session.id(), box.session.peer() )
end)

box.session.on_disconnect(function()
    log.info( "disconnected %s from %s", box.session.id(), box.session.peer() )
end)


2020-07-09 20:52:09.107 [32604] main/115/main I> connected 2 from 127.0.0.1:36652
2020-07-09 20:52:10.260 [32604] main/116/main I> disconnected 2 from nil
2020-07-09 20:52:10.823 [32604] main/116/main I> connected 3 from 127.0.0.1:36654
2020-07-09 20:52:11.541 [32604] main/115/main I> disconnected 3 from nil


session id, , IP . , . session.peer() getpeername(2) . , (getpeername ). C . Tarantool box.session.storage — , , , . , , , . .



box.session.on_connect(function()
    box.session.storage.peer = box.session.peer()
    log.info( "connected %s from %s", box.session.id(), box.session.storage.peer )
end)

box.session.on_disconnect(function()
    log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
end)


. - . « ». , , . , , take.



queue.taken = {}; --   
queue.bysid = {}; --     


function queue.take(timeout)
    if not timeout then timeout = 0 end
    local now = fiber.time()
    local found
    while not found do
        found = box.space.queue.index.status
            :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
        if not found then
            local left = (now + timeout) - fiber.time()
            if left <= 0 then return end
            queue._wait:get(left)
        end
    end

    local sid = box.session.id()
    log.info("Register %s by %s", found.id, sid)
    queue.taken[ found.id ] = sid
    queue.bysid[ sid ] = queue.bysid[ sid ] or {}
    queue.bysid[ sid ][ found.id ] = true

    return box.space.queue
        :update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
end


, . , ack release. . , , . , : « , ».



local function get_task( id )
    if not id then error("Task id required", 2) end
    local t = box.space.queue:get{id}
    if not t then
        error(string.format( "Task {%s} was not found", id ), 2)
    end
    if not queue.taken[id] then
        error(string.format( "Task %s not taken by anybody", id ), 2)
    end
    if queue.taken[id] ~= box.session.id() then
        error(string.format( "Task %s taken by %d. Not you (%d)",
            id, queue.taken[id], box.session.id() ), 2)
    end
    return t
end


ack release . get_task, , . .



function queue.ack(id)
    local t = get_task(id)
    queue.taken[ t.id ] = nil
    queue.bysid[ box.session.id() ][ t.id ] = nil
    return box.space.queue:delete{t.id}
end

function queue.release(id)
    local t = get_task(id)
    if queue._wait:has_readers() then queue._wait:put(true,0) end
    queue.taken[ t.id ] = nil
    queue.bysid[ box.session.id() ][ t.id ] = nil
    return box.space.queue
        :update({t.id},{{'=', F.status, STATUS.READY }})
end


R SQL Lua-c:



box.execute[[ update "queue" set "status" = 'R' where "status" = 'T' ]]
box.space.queue.index.status:pairs({'T'}):each(function(t) box.space.queue:update({t.id},{{'=',2,'R'}}) end)


consumer , task ID required.



$ tarantool consumer.lua 
Got task:
        --- [1594327004302379957, 'T', ['task 3']]
...

ER_PROC_LUA: queue.lua:113: Task id required


. Tarantool, . , . . , . , . :tomap{ names_only = true }:



function queue.put(...)
    --- ...
    return box.space.queue
        :insert{ id, STATUS.READY, { ... } }
        :tomap{ names_only = true }
end

function queue.take(timeout)
    --- ...
    return box.space.queue
        :update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
        :tomap{ names_only = true }
end

function queue.ack(id)
    --- ...
    return box.space.queue:delete{t.id}:tomap{ names_only = true }
end

function queue.release(id)
    --- ...
    return box.space.queue
        :update({t.id},{{'=', F.status, STATUS.READY }})
        :tomap{ names_only = true }
end

return queue


, .



$ tarantool consumer.lua 
Got task:
        --- {'status': 'T', 'data': ['hi'], 'id': 1594327270675788959}
...

ER_PROC_LUA: queue.lua:117: Task 1594327270675788959ULL not taken by anybody


, . , ID . - ULL.



LuaJIT: FFI (Foreign Function Interface). . , 1.



tarantool> t = {}
tarantool> t[1] = 1
tarantool> t["1"] = 2
tarantool> t[1LL] = 3
tarantool> t[1ULL] = 4
tarantool> t[1ULL] = 5
tarantool> t
---
- 1: 1
  1: 5
  1: 4
  '1': 2
  1: 3
...


, 2 ( + ). 3 (, , LL). , : 1, 2, 3, 4, 5. , , .



tarantool> return t[1], t['1'], t[1LL], t[1ULL]
---
- 1
- 2
- null
- null
...


, . Lua- (number string), LL (long long) ULL (unsigned long long) — . cdata. C. Lua- cdata , . , , . ULL , .



. , . - . MessagePack. Tarantool , Tarantool. , .



local msgpack = require 'msgpack'

local function keypack( key )
    return msgpack.encode( key )
end

local function keyunpack( data )
    return msgpack.decode( data )
end


take . get_task , , , int64. keypack, MessagePack. , , get_task, ack release .



function queue.take(timeout)
    if not timeout then timeout = 0 end
    local now = fiber.time()
    local found
    while not found do
        found = box.space.queue.index.status
            :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
        if not found then
            local left = (now + timeout) - fiber.time()
            if left <= 0 then return end
            queue._wait:get(left)
        end
    end

    local sid = box.session.id()
    log.info("Register %s by %s", found.id, sid)
    local key = keypack( found.id )
    queue.taken[ key ] = sid
    queue.bysid[ sid ] = queue.bysid[ sid ] or {}
    queue.bysid[ sid ][ key ] = true

    return box.space.queue
        :update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
        :tomap{ names_only = true }
end

local function get_task( id )
    if not id then error("Task id required", 2) end
    id = tonumber64(id)
    local key = keypack(id)
    local t = box.space.queue:get{id}
    if not t then
        error(string.format( "Task {%s} was not found", id ), 2)
    end
    if not queue.taken[key] then
        error(string.format( "Task %s not taken by anybody", id ), 2)
    end
    if queue.taken[key] ~= box.session.id() then
        error(string.format( "Task %s taken by %d. Not you (%d)",
            id, queue.taken[key], box.session.id() ), 2)
    end
    return t, key
end

function queue.ack(id)
    local t, key = get_task(id)
    queue.taken[ key ] = nil
    queue.bysid[ box.session.id() ][ key ] = nil
    return box.space.queue:delete{t.id}:tomap{ names_only = true }
end

function queue.release(id)
    local t, key = get_task(id)
    queue.taken[ key ] = nil
    queue.bysid[ box.session.id() ][ key ] = nil
    if queue._wait:has_readers() then queue._wait:put(true,0) end
    return box.space.queue
        :update({t.id},{{'=', F.status, STATUS.READY }})
        :tomap{ names_only = true }
end


, , , - . — ready. , take. session.storage, .



box.session.on_disconnect(function()
    log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
    box.session.storage.destroyed = true

    local sid = box.session.id()
    local bysid = queue.bysid[ sid ]
    if bysid then
        while next(bysid) do
            for key, id in pairs(bysid) do
                log.info("Autorelease %s by disconnect", id);
                queue.taken[key] = nil
                bysid[key] = nil
                local t = box.space.queue:get(id)
                if t then
                    if queue._wait:has_readers() then queue._wait:put(true,0) end
                    box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }})
                end
            end
        end
        queue.bysid[ sid ] = nil
    end
end)

function queue.take(timeout)
    if not timeout then timeout = 0 end
    local now = fiber.time()
    local found
    while not found do
        found = box.space.queue.index.status
            :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
        if not found then
            local left = (now + timeout) - fiber.time()
            if left <= 0 then return end
            queue._wait:get(left)
        end
    end

    if box.session.storage.destroyed then return end

    local sid = box.session.id()
    log.info("Register %s by %s", found.id, sid)
    local key = keypack( found.id )
    queue.taken[ key ] = sid
    queue.bysid[ sid ] = queue.bysid[ sid ] or {}
    queue.bysid[ sid ][ key ] = found.id

    return box.space.queue
        :update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
        :tomap{ names_only = true }
end


:



tarantoolctl connect 127.0.0.1:3301 <<< 'queue.take()'


, , , , — ( ), taken. : .



while true do
    local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1)
    if not t then break end
    box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }})
    log.info("Autoreleased %s at start", t.id)
end


, .





. . , . put :W=WAITING.



box.space.queue:format( {
    { name = 'id';     type = 'number' },
    { name = 'status'; type = 'string' },
    { name = 'runat';  type = 'number' },
    { name = 'data';   type = '*'      },
} )

box.space.queue:create_index('runat', {
    parts = { F.runat, 'number', F.id, 'number' };
    if_not_exists = true;
})

STATUS.WAITING = 'W'


, ( ):



box.space.queue.drop()
box.snapshot()


.



put release delay. delay , WAITING , . . . , . , .



function queue.put(data, opts)
    local id = gen_id()

    local runat = 0
    local status = STATUS.READY

    if opts and opts.delay then
        runat = clock.realtime() + tonumber(opts.delay)
        status = STATUS.WAITING
    else
        if queue._wait:has_readers() then
            queue._wait:put(true,0)
        end
    end

    return box.space.queue
        :insert{ id, status, runat, data }
        :tomap{ names_only=true }
end

function queue.release(id, opts)
    local t, key = get_task(id)
    queue.taken[ key ] = nil
    queue.bysid[ box.session.id() ][ key ] = nil

    local runat = 0
    local status = STATUS.READY

    if opts and opts.delay then
        runat = clock.realtime() + tonumber(opts.delay)
        status = STATUS.WAITING
    else
        if queue._wait:has_readers() then queue._wait:put(true,0) end
    end

    return box.space.queue
        :update({t.id},{{ '=', F.status, status },{ '=', F.runat, runat }})
        :tomap{ names_only = true }
end


- , . , , .



. take, . , , . , , queue.runat.



queue._runat = fiber.create(function()
    fiber.name('queue.runat')
    while true do
        local remaining

        local now = clock.realtime()
        for _,t in box.space.queue.index.runat
            :pairs( { 0 }, { iterator = 'GT' })
        do
            if t.runat > now then
                remaining = t.runat - now
                break
            else
                if t.status == STATUS.WAITING then
                    log.info("Runat: W->R %s",t.id)
                    if queue._wait:has_readers() then queue._wait:put(true,0) end
                    box.space.queue:update({ t.id }, {
                        {'=', F.status, STATUS.READY },
                        {'=', F.runat, 0 },
                    })
                else
                    log.error("Runat: bad status %s for %s", t.status, t.id)
                    box.space.queue:update({ t.id },{{ '=', F.runat, 0 }})
                end
            end
        end

        if not remaining or remaining > 1 then remaining = 1 end
        fiber.sleep(remaining)
    end
end)




, , : . .



: , , .



function queue.stats()
    return {
        total   = box.space.queue:len(),
        ready   = box.space.queue.index.status:count({STATUS.READY}),
        waiting = box.space.queue.index.status:count({STATUS.WAITING}),
        taken   = box.space.queue.index.status:count({STATUS.TAKEN}),
    }    
end


tarantool> queue.stats()
---
- ready: 10
  taken: 2
  waiting: 5
  total: 17
...

tarantool> local clock = require 'clock' local s = clock.time() local r = queue.stats() return r, clock.time() - s
---
- ready: 10
  taken: 2
  waiting: 5
  total: 17
- 0.00057339668273926
...


. , . — . , - , , . stats . , . index:count — full-scan . .



queue._stats = {}
for k,v in pairs(STATUS) do
    queue._stats[v] = 0LL
end

for _,t in box.space.queue:pairs() do
    queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1
end

function queue.stats()
    return {
        total   = box.space.queue:len(),
        ready   = queue._stats[ STATUS.READY ],
        waiting = queue._stats[ STATUS.WAITING ],
        taken   = queue._stats[ STATUS.TAKEN ],
    }
end


, . . , . , , , . Tarantool . . space:update space:delete, . , . .



box.space.queue:on_replace(function(old,new)
    if old then
        queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1
    end
    if new then
        queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1
    end
end)


, , : space:truncate(). — _truncate.



box.space._truncate:on_replace(function(old,new)
    if new.id == box.space.queue.id then
        for k,v in pairs(queue._stats) do
            queue._stats[k] = 0LL
        end
    end
end)


. , , . Tarantool . , C.



graphite UDP:



local socket = require 'socket'
local errno = require 'errno'

local graphite_host = '127.0.0.1'
local graphite_port = 2003

local ai = socket.getaddrinfo(graphite_host, graphite_port, 1, { type = 'SOCK_DGRAM' })
local addr,port
for _,info in pairs(ai) do
   addr,port = info.host,info.port
   break
end
if not addr then error("Failed to resolve host") end

queue._monitor = fiber.create(function()
    fiber.name('queue.monitor')
    fiber.yield()
    local remote = socket('AF_INET', 'SOCK_DGRAM', 'udp')
    while true do
        for k,v in pairs(queue.stats()) do
            local msg = string.format("queue.stats.%s %s %s\n", k, tonumber(v), math.floor(fiber.time()))
            local res = remote:sendto(addr, port, msg)
            if not res then
                log.error("Failed to send: %s", errno.strerror(errno()))
            end
        end
        fiber.sleep(1)
    end
end)


TCP:



local socket = require 'socket'
local errno = require 'errno'

local graphite_host = '127.0.0.1'
local graphite_port = 2003

queue._monitor = fiber.create(function()
    fiber.name('queue.monitor')
    fiber.yield()
    while true do
        local remote =  require 'socket'.tcp_connect(graphite_host, graphite_port)
        if not remote then
            log.error("Failed to connect to graphite %s",errno.strerror())
            fiber.sleep(1)
        else
            while true do
                local data = {}
                for k,v in pairs(queue.stats()) do
                    table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time())))
                end
                data = table.concat(data,'')
                if not remote:send(data) then
                    log.error("%s",errno.strerror())
                    break
                end
                fiber.sleep(1)
            end
        end
    end
end)




, Tarantool, . , , . , .



Lua - require, package.loaded . require , . Lua , package.loaded[...] require. , , . - :



require'strict'.on()
fiber = require 'fiber'

box.cfg{
    listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })

local not_first_run = rawget(_G,'_NOT_FIRST_RUN')
_NOT_FIRST_RUN = true
if not_first_run then
   for k,v in pairs(package.loaded) do
      if not preloaded[k] then
         package.loaded[k] = nil
      end
   end
else
   preloaded = {}
   for k,v in pairs(package.loaded) do
      preloaded[k] = true
   end
end

queue = require 'queue'

require'console'.start()
os.exit()


, package.reload, . , , , : package.reload().



require'strict'.on()
fiber = require 'fiber'

box.cfg{
    listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })

require 'package.reload'

queue = require 'queue'

require'console'.start()
os.exit()


, -. , . , — . , .



local queue = {}
local old = rawget(_G,'queue')
if old then
    queue.taken = old.taken
    queue.bysid = old.bysid
    queue._triggers = old._triggers
    queue._stats = old._stats
    queue._wait = old._wait
    queue._runch = old._runch
    queue._runat = old._runat
else
    queue.taken = {}
    queue.bysid = {}
    queue._triggers = {}
    queue._stats = {}
    queue._wait = fiber.channel()
    queue._runch = fiber.cond()
    while true do
        local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1)
        if not t then break end
        box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }})
        log.info("Autoreleased %s at start", t.id)
    end
    for k,v in pairs(STATUS) do
        queue._stats[v] = 0LL
    end
    for _,t in box.space.queue:pairs() do
        queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1
    end
    log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats })
end


. , . , . . . .



queue._triggers.on_replace = box.space.queue:on_replace(function(old,new)
    if old then
        queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1
    end
    if new then
        queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1
    end
end, queue._triggers.on_replace)

queue._triggers.on_truncate = box.space._truncate:on_replace(function(old,new)
    if new.id == box.space.queue.id then
        for k,v in pairs(queue._stats) do
            queue._stats[k] = 0LL
        end
    end
end, queue._triggers.on_truncate)

queue._triggers.on_connect = box.session.on_connect(function()
    box.session.storage.peer = box.session.peer()
    log.info( "connected %s from %s", box.session.id(), box.session.storage.peer )
end, queue._triggers.on_connect)

queue._triggers.on_disconnect = box.session.on_disconnect(function()
    log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
    box.session.storage.destroyed = true

    local sid = box.session.id()
    local bysid = queue.bysid[ sid ]
    if bysid then
        while next(bysid) do
            for key, id in pairs(bysid) do
                log.info("Autorelease %s by disconnect", id);
                queue.taken[key] = nil
                bysid[key] = nil
                local t = box.space.queue:get(id)
                if t then
                    if queue._wait:has_readers() then queue._wait:put(true,0) end
                    box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }})
                end
            end
        end
        queue.bysid[ sid ] = nil
    end
end, queue._triggers.on_disconnect)


— . , . while ... true, . , , , fiber.cond: condition variable, .



. , fiber.kill, : kill . : , . . : .



queue._runat = fiber.create(function(queue, gen, old_fiber)
    fiber.name('queue.runat.'..gen)

    while package.reload.count == gen and old_fiber and old_fiber:status() ~= 'dead' do
        log.info("Waiting for old to die")
        queue._runch:wait(0.1)
    end

    log.info("Started...")
    while package.reload.count == gen do
        local remaining

        local now = clock.realtime()

        for _,t in box.space.queue.index.runat
            :pairs( {0}, { iterator = 'GT' })
        do
            if t.runat > now then
                remaining = t.runat - now
                break
            else
                if t.status == STATUS.WAITING then
                    log.info("Runat: W->R %s",t.id)
                    if queue._wait:has_readers() then queue._wait:put(true,0) end
                    box.space.queue:update({ t.id }, {
                        { '=', F.status, STATUS.READY },
                        { '=', F.runat, 0 },
                    })
                else
                    log.error("Runat: bad status %s for %s", t.status, t.id)
                    box.space.queue:update({ t.id },{{ '=', F.runat, 0 }})
                end
            end
        end

        if not remaining or remaining > 1 then remaining = 1 end
        queue._runch:wait(remaining)
    end

    queue._runch:broadcast()
    log.info("Finished")
end, queue, package.reload.count, queue._runat)
queue._runch:broadcast()


: , . :



if not fiber.self().storage.console then
    require'console'.start()
    os.exit()
end




, , Graphite TCP, . 20 . . 300 .



queue.lua:



local clock = require 'clock'
local errno = require 'errno'
local fiber = require 'fiber'
local log = require 'log'
local msgpack = require 'msgpack'
local socket = require 'socket'

box.schema.create_space('queue',{ if_not_exists = true; })

box.space.queue:format( {
    { name = 'id';     type = 'number' },
    { name = 'status'; type = 'string' },
    { name = 'runat';  type = 'number' },
    { name = 'data';   type = '*'      },
} );

local F = {}
for no,def in pairs(box.space.queue:format()) do
    F[no] = def.name
    F[def.name] = no
end

box.space.queue:create_index('primary', {
   parts = { F.id, 'number' };
   if_not_exists = true;
})

box.space.queue:create_index('status', {
    parts = { F.status, 'string', F.id, 'number' };
    if_not_exists = true;
})

box.space.queue:create_index('runat', {
    parts = { F.runat, 'number', F.id, 'number' };
    if_not_exists = true;
})

local STATUS = {}
STATUS.READY = 'R'
STATUS.TAKEN = 'T'
STATUS.WAITING = 'W'

local queue = {}
local old = rawget(_G,'queue')
if old then
    queue.taken = old.taken
    queue.bysid = old.bysid
    queue._triggers = old._triggers
    queue._stats = old._stats
    queue._wait = old._wait
    queue._runch = old._runch
    queue._runat = old._runat
else
    queue.taken = {}
    queue.bysid = {}
    queue._triggers = {}
    queue._stats = {}
    queue._wait = fiber.channel()
    queue._runch = fiber.cond()
    while true do
        local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1)
        if not t then break end
        box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }})
        log.info("Autoreleased %s at start", t.id)
    end

    for k,v in pairs(STATUS) do queue._stats[v] = 0LL end
    for _,t in box.space.queue:pairs() do
        queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1
    end
    log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats })
end

local function gen_id()
    local new_id
    repeat
        new_id = clock.realtime64()
    until not box.space.queue:get(new_id)
    return new_id
end

local function keypack( key )
    return msgpack.encode( key )
end

local function keyunpack( data )
    return msgpack.decode( data )
end

queue._triggers.on_replace = box.space.queue:on_replace(function(old,new)
    if old then
        queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1
    end
    if new then
        queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1
    end
end, queue._triggers.on_replace)

queue._triggers.on_truncate = box.space._truncate:on_replace(function(old,new)
    if new.id == box.space.queue.id then
        for k,v in pairs(queue._stats) do
            queue._stats[k] = 0LL
        end
    end
end, queue._triggers.on_truncate)

queue._triggers.on_connect = box.session.on_connect(function()
    box.session.storage.peer = box.session.peer()
end, queue._triggers.on_connect)

queue._triggers.on_disconnect = box.session.on_disconnect(function()
    box.session.storage.destroyed = true
    local sid = box.session.id()
    local bysid = queue.bysid[ sid ]
    if bysid then
        log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
        while next(bysid) do
            for key, id in pairs(bysid) do
                log.info("Autorelease %s by disconnect", id);
                queue.taken[key] = nil
                bysid[key] = nil
                local t = box.space.queue:get(id)
                if t then
                    if queue._wait:has_readers() then queue._wait:put(true,0) end
                    box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }})
                end
            end
        end
        queue.bysid[ sid ] = nil
    end
end, queue._triggers.on_disconnect)

queue._runat = fiber.create(function(queue, gen, old_fiber)
    fiber.name('queue.runat.'..gen)

    while package.reload.count == gen and old_fiber and old_fiber:status() ~= 'dead' do
        log.info("Waiting for old to die")
        queue._runch:wait(0.1)
    end

    log.info("Started...")
    while package.reload.count == gen do
        local remaining

        local now = clock.realtime()

        for _,t in box.space.queue.index.runat
            :pairs( {0}, { iterator = 'GT' })
        do
            if t.runat > now then
                remaining = t.runat - now
                break
            else
                if t.status == STATUS.WAITING then
                    log.info("Runat: W->R %s",t.id)
                    if queue._wait:has_readers() then queue._wait:put(true,0) end
                    box.space.queue:update({ t.id }, {
                        { '=', F.status, STATUS.READY },
                        { '=', F.runat, 0 },
                    })
                else
                    log.error("Runat: bad status %s for %s", t.status, t.id)
                    box.space.queue:update({ t.id },{{ '=', F.runat, 0 }})
                end
            end
        end

        if not remaining or remaining > 1 then remaining = 1 end
        queue._runch:wait(remaining)
    end

    queue._runch:broadcast()
    log.info("Finished")
end, queue, package.reload.count, queue._runat)
queue._runch:broadcast()

local graphite_host = '127.0.0.1'
local graphite_port = 2003
queue._monitor = fiber.create(function(gen)
    fiber.name('queue.mon.'..gen)
    fiber.yield()
    while package.reload.count == gen do
        local remote =  require 'socket'.tcp_connect(graphite_host, graphite_port)
        if not remote then
            log.error("Failed to connect to graphite %s",errno.strerror())
            fiber.sleep(1)
        else
            while package.reload.count == gen do
                local data = {}
                for k,v in pairs(queue.stats()) do
                    table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time())))
                end
                data = table.concat(data,'')
                if not remote:send(data) then
                    log.error("%s",errno.strerror())
                    break
                end
                fiber.sleep(1)
            end
        end
    end
end, package.reload.count)

function queue.put(data, opts)
    local id = gen_id()

    local runat = 0
    local status = STATUS.READY
    if opts and opts.delay then
        runat = clock.realtime() + tonumber(opts.delay)
        status = STATUS.WAITING
    else
        if queue._wait:has_readers() then
            queue._wait:put(true,0)
        end
    end

    return box.space.queue
        :insert{ id, status, runat, data }
        :tomap{ names_only=true }
end

function queue.take(timeout)
    if not timeout then timeout = 0 end
    local now = fiber.time()
    local found
    while not found do
        found = box.space.queue.index.status
            :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
        if not found then
            local left = (now + timeout) - fiber.time()
            if left <= 0 then return end
            queue._wait:get(left)
        end
    end

    if box.session.storage.destroyed then return end

    local sid = box.session.id()
    log.info("Register %s by %s", found.id, sid)
    local key = keypack( found.id )
    queue.taken[ key ] = sid
    queue.bysid[ sid ] = queue.bysid[ sid ] or {}
    queue.bysid[ sid ][ key ] = found.id

    return box.space.queue
        :update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
        :tomap{ names_only = true }
end

local function get_task( id )
    if not id then error("Task id required", 2) end
    id = tonumber64(id)
    local key = keypack(id)
    local t = box.space.queue:get{id}
    if not t then
        error(string.format( "Task {%s} was not found", id ), 2)
    end
    if not queue.taken[key] then
        error(string.format( "Task %s not taken by anybody", id ), 2)
    end
    if queue.taken[key] ~= box.session.id() then
        error(string.format( "Task %s taken by %d. Not you (%d)",
            id, queue.taken[key], box.session.id() ), 2)
    end
    return t, key
end

function queue.ack(id)
    local t, key = get_task(id)
    queue.taken[ key ] = nil
    queue.bysid[ box.session.id() ][ key ] = nil
    return box.space.queue:delete{t.id}:tomap{ names_only = true }
end

function queue.release(id, opts)
    local t, key = get_task(id)
    queue.taken[ key ] = nil
    queue.bysid[ box.session.id() ][ key ] = nil

    local runat = 0
    local status = STATUS.READY

    if opts and opts.delay then
        runat = clock.realtime() + tonumber(opts.delay)
        status = STATUS.WAITING
    else
        if queue._wait:has_readers() then queue._wait:put(true,0) end
    end

    return box.space.queue
        :update({t.id},{{'=', F.status, status },{ '=', F.runat, runat }})
        :tomap{ names_only = true }
end

function queue.stats()
    return {
        total   = box.space.queue:len(),
        ready   = queue._stats[ STATUS.READY ],
        waiting = queue._stats[ STATUS.WAITING ],
        taken   = queue._stats[ STATUS.TAKEN ],
    }
end

return queue


init.lua:



require'strict'.on()
fiber = require 'fiber'
require 'package.reload'

box.cfg{
    listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })

queue = require 'queue'

if not fiber.self().storage.console then
    require'console'.start()
    os.exit()
end





14 Rebrain & Tarantool: application server — Tarantool. .




All Articles