skynet.lua 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170
  1. -- read https://github.com/cloudwu/skynet/wiki/FAQ for the module "skynet.core"
  2. local c = require "skynet.core"
  3. local skynet_require = require "skynet.require"
  4. local tostring = tostring
  5. local coroutine = coroutine
  6. local assert = assert
  7. local pairs = pairs
  8. local pcall = pcall
  9. local table = table
  10. local next = next
  11. local tremove = table.remove
  12. local tinsert = table.insert
  13. local tpack = table.pack
  14. local tunpack = table.unpack
  15. local traceback = debug.traceback
  16. local cresume = coroutine.resume
  17. local running_thread = nil
  18. local init_thread = nil
  19. local function coroutine_resume(co, ...)
  20. running_thread = co
  21. return cresume(co, ...)
  22. end
  23. local coroutine_yield = coroutine.yield
  24. local coroutine_create = coroutine.create
  25. local proto = {}
  26. local skynet = {
  27. -- read skynet.h
  28. PTYPE_TEXT = 0,
  29. PTYPE_RESPONSE = 1,
  30. PTYPE_MULTICAST = 2,
  31. PTYPE_CLIENT = 3,
  32. PTYPE_SYSTEM = 4,
  33. PTYPE_HARBOR = 5,
  34. PTYPE_SOCKET = 6,
  35. PTYPE_ERROR = 7,
  36. PTYPE_QUEUE = 8, -- used in deprecated mqueue, use skynet.queue instead
  37. PTYPE_DEBUG = 9,
  38. PTYPE_LUA = 10,
  39. PTYPE_SNAX = 11,
  40. PTYPE_TRACE = 12, -- use for debug trace
  41. }
  42. -- code cache
  43. skynet.cache = require "skynet.codecache"
  44. skynet._proto = proto
  45. function skynet.register_protocol(class)
  46. local name = class.name
  47. local id = class.id
  48. assert(proto[name] == nil and proto[id] == nil)
  49. assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
  50. proto[name] = class
  51. proto[id] = class
  52. end
  53. local session_id_coroutine = {}
  54. local session_coroutine_id = {}
  55. local session_coroutine_address = {}
  56. local session_coroutine_tracetag = {}
  57. local unresponse = {}
  58. local wakeup_queue = {}
  59. local sleep_session = {}
  60. local watching_session = {}
  61. local error_queue = {}
  62. local fork_queue = { h = 1, t = 0 }
  63. local auxsend, auxtimeout
  64. do ---- avoid session rewind conflict
  65. local csend = c.send
  66. local cintcommand = c.intcommand
  67. local dangerzone
  68. local dangerzone_size = 0x1000
  69. local dangerzone_low = 0x70000000
  70. local dangerzone_up = dangerzone_low + dangerzone_size
  71. local set_checkrewind -- set auxsend and auxtimeout for safezone
  72. local set_checkconflict -- set auxsend and auxtimeout for dangerzone
  73. local function reset_dangerzone(session)
  74. dangerzone_up = session
  75. dangerzone_low = session
  76. dangerzone = { [session] = true }
  77. for s in pairs(session_id_coroutine) do
  78. if s < dangerzone_low then
  79. dangerzone_low = s
  80. elseif s > dangerzone_up then
  81. dangerzone_up = s
  82. end
  83. dangerzone[s] = true
  84. end
  85. dangerzone_low = dangerzone_low - dangerzone_size
  86. end
  87. -- in dangerzone, we should check if the next session already exist.
  88. local function checkconflict(session)
  89. if session == nil then
  90. return
  91. end
  92. local next_session = session + 1
  93. if next_session > dangerzone_up then
  94. -- leave dangerzone
  95. reset_dangerzone(session)
  96. assert(next_session > dangerzone_up)
  97. set_checkrewind()
  98. else
  99. while true do
  100. if not dangerzone[next_session] then
  101. break
  102. end
  103. if not session_id_coroutine[next_session] then
  104. reset_dangerzone(session)
  105. break
  106. end
  107. -- skip the session already exist.
  108. next_session = c.genid() + 1
  109. end
  110. end
  111. -- session will rewind after 0x7fffffff
  112. if next_session == 0x80000000 and dangerzone[1] then
  113. assert(c.genid() == 1)
  114. return checkconflict(1)
  115. end
  116. end
  117. local function auxsend_checkconflict(addr, proto, msg, sz)
  118. local session = csend(addr, proto, nil, msg, sz)
  119. checkconflict(session)
  120. return session
  121. end
  122. local function auxtimeout_checkconflict(timeout)
  123. local session = cintcommand("TIMEOUT", timeout)
  124. checkconflict(session)
  125. return session
  126. end
  127. local function auxsend_checkrewind(addr, proto, msg, sz)
  128. local session = csend(addr, proto, nil, msg, sz)
  129. if session and session > dangerzone_low and session <= dangerzone_up then
  130. -- enter dangerzone
  131. set_checkconflict(session)
  132. end
  133. return session
  134. end
  135. local function auxtimeout_checkrewind(timeout)
  136. local session = cintcommand("TIMEOUT", timeout)
  137. if session and session > dangerzone_low and session <= dangerzone_up then
  138. -- enter dangerzone
  139. set_checkconflict(session)
  140. end
  141. return session
  142. end
  143. set_checkrewind = function()
  144. auxsend = auxsend_checkrewind
  145. auxtimeout = auxtimeout_checkrewind
  146. end
  147. set_checkconflict = function(session)
  148. reset_dangerzone(session)
  149. auxsend = auxsend_checkconflict
  150. auxtimeout = auxtimeout_checkconflict
  151. end
  152. -- in safezone at the beginning
  153. set_checkrewind()
  154. end
  155. do ---- request/select
  156. local function send_requests(self)
  157. local sessions = {}
  158. self._sessions = sessions
  159. local request_n = 0
  160. local err
  161. for i = 1, #self do
  162. local req = self[i]
  163. local addr = req[1]
  164. local p = proto[req[2]]
  165. assert(p.unpack)
  166. local tag = session_coroutine_tracetag[running_thread]
  167. if tag then
  168. c.trace(tag, "call", 4)
  169. c.send(addr, skynet.PTYPE_TRACE, 0, tag)
  170. end
  171. local session = auxsend(addr, p.id , p.pack(tunpack(req, 3, req.n)))
  172. if session == nil then
  173. err = err or {}
  174. err[#err+1] = req
  175. else
  176. sessions[session] = req
  177. watching_session[session] = addr
  178. session_id_coroutine[session] = self._thread
  179. request_n = request_n + 1
  180. end
  181. end
  182. self._request = request_n
  183. return err
  184. end
  185. local function request_thread(self)
  186. while true do
  187. local succ, msg, sz, session = coroutine_yield "SUSPEND"
  188. if session == self._timeout then
  189. self._timeout = nil
  190. self.timeout = true
  191. else
  192. watching_session[session] = nil
  193. local req = self._sessions[session]
  194. local p = proto[req[2]]
  195. if succ then
  196. self._resp[session] = tpack( p.unpack(msg, sz) )
  197. else
  198. self._resp[session] = false
  199. end
  200. end
  201. skynet.wakeup(self)
  202. end
  203. end
  204. local function request_iter(self)
  205. return function()
  206. if self._error then
  207. -- invalid address
  208. local e = tremove(self._error)
  209. if e then
  210. return e
  211. end
  212. self._error = nil
  213. end
  214. local session, resp = next(self._resp)
  215. if session == nil then
  216. if self._request == 0 then
  217. return
  218. end
  219. if self.timeout then
  220. return
  221. end
  222. skynet.wait(self)
  223. if self.timeout then
  224. return
  225. end
  226. session, resp = next(self._resp)
  227. end
  228. self._request = self._request - 1
  229. local req = self._sessions[session]
  230. self._resp[session] = nil
  231. self._sessions[session] = nil
  232. return req, resp
  233. end
  234. end
  235. local request_meta = {} ; request_meta.__index = request_meta
  236. function request_meta:add(obj)
  237. assert(type(obj) == "table" and not self._thread)
  238. self[#self+1] = obj
  239. return self
  240. end
  241. request_meta.__call = request_meta.add
  242. function request_meta:close()
  243. if self._request > 0 then
  244. local resp = self._resp
  245. for session, req in pairs(self._sessions) do
  246. if not resp[session] then
  247. session_id_coroutine[session] = "BREAK"
  248. watching_session[session] = nil
  249. end
  250. end
  251. self._request = 0
  252. end
  253. if self._timeout then
  254. session_id_coroutine[self._timeout] = "BREAK"
  255. self._timeout = nil
  256. end
  257. end
  258. request_meta.__close = request_meta.close
  259. function request_meta:select(timeout)
  260. assert(self._thread == nil)
  261. self._thread = coroutine_create(request_thread)
  262. self._error = send_requests(self)
  263. self._resp = {}
  264. if timeout then
  265. self._timeout = auxtimeout(timeout)
  266. session_id_coroutine[self._timeout] = self._thread
  267. end
  268. local running = running_thread
  269. coroutine_resume(self._thread, self)
  270. running_thread = running
  271. return request_iter(self), nil, nil, self
  272. end
  273. function skynet.request(obj)
  274. local ret = setmetatable({}, request_meta)
  275. if obj then
  276. return ret(obj)
  277. end
  278. return ret
  279. end
  280. end
  281. -- suspend is function
  282. local suspend
  283. ----- monitor exit
  284. local function dispatch_error_queue()
  285. local session = tremove(error_queue,1)
  286. if session then
  287. local co = session_id_coroutine[session]
  288. session_id_coroutine[session] = nil
  289. return suspend(co, coroutine_resume(co, false, nil, nil, session))
  290. end
  291. end
  292. local function _error_dispatch(error_session, error_source)
  293. skynet.ignoreret() -- don't return for error
  294. if error_session == 0 then
  295. -- error_source is down, clear unreponse set
  296. for resp, address in pairs(unresponse) do
  297. if error_source == address then
  298. unresponse[resp] = nil
  299. end
  300. end
  301. for session, srv in pairs(watching_session) do
  302. if srv == error_source then
  303. tinsert(error_queue, session)
  304. end
  305. end
  306. else
  307. -- capture an error for error_session
  308. if watching_session[error_session] then
  309. tinsert(error_queue, error_session)
  310. end
  311. end
  312. end
  313. -- coroutine reuse
  314. local coroutine_pool = setmetatable({}, { __mode = "kv" })
  315. local function co_create(f)
  316. local co = tremove(coroutine_pool)
  317. if co == nil then
  318. co = coroutine_create(function(...)
  319. f(...)
  320. while true do
  321. local session = session_coroutine_id[co]
  322. if session and session ~= 0 then
  323. local source = debug.getinfo(f,"S")
  324. skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
  325. session,
  326. skynet.address(session_coroutine_address[co]),
  327. source.source, source.linedefined))
  328. end
  329. -- coroutine exit
  330. local tag = session_coroutine_tracetag[co]
  331. if tag ~= nil then
  332. if tag then c.trace(tag, "end") end
  333. session_coroutine_tracetag[co] = nil
  334. end
  335. local address = session_coroutine_address[co]
  336. if address then
  337. session_coroutine_id[co] = nil
  338. session_coroutine_address[co] = nil
  339. end
  340. -- recycle co into pool
  341. f = nil
  342. coroutine_pool[#coroutine_pool+1] = co
  343. -- recv new main function f
  344. f = coroutine_yield "SUSPEND"
  345. f(coroutine_yield())
  346. end
  347. end)
  348. else
  349. -- pass the main function f to coroutine, and restore running thread
  350. local running = running_thread
  351. coroutine_resume(co, f)
  352. running_thread = running
  353. end
  354. return co
  355. end
  356. local function dispatch_wakeup()
  357. while true do
  358. local token = tremove(wakeup_queue,1)
  359. if token then
  360. local session = sleep_session[token]
  361. if session then
  362. local co = session_id_coroutine[session]
  363. local tag = session_coroutine_tracetag[co]
  364. if tag then c.trace(tag, "resume") end
  365. session_id_coroutine[session] = "BREAK"
  366. return suspend(co, coroutine_resume(co, false, "BREAK", nil, session))
  367. end
  368. else
  369. break
  370. end
  371. end
  372. return dispatch_error_queue()
  373. end
  374. -- suspend is local function
  375. function suspend(co, result, command)
  376. if not result then
  377. local session = session_coroutine_id[co]
  378. if session then -- coroutine may fork by others (session is nil)
  379. local addr = session_coroutine_address[co]
  380. if session ~= 0 then
  381. -- only call response error
  382. local tag = session_coroutine_tracetag[co]
  383. if tag then c.trace(tag, "error") end
  384. c.send(addr, skynet.PTYPE_ERROR, session, "")
  385. end
  386. session_coroutine_id[co] = nil
  387. end
  388. session_coroutine_address[co] = nil
  389. session_coroutine_tracetag[co] = nil
  390. skynet.fork(function() end) -- trigger command "SUSPEND"
  391. local tb = traceback(co,tostring(command))
  392. coroutine.close(co)
  393. error(tb)
  394. end
  395. if command == "SUSPEND" then
  396. return dispatch_wakeup()
  397. elseif command == "QUIT" then
  398. coroutine.close(co)
  399. -- service exit
  400. return
  401. elseif command == "USER" then
  402. -- See skynet.coutine for detail
  403. error("Call skynet.coroutine.yield out of skynet.coroutine.resume\n" .. traceback(co))
  404. elseif command == nil then
  405. -- debug trace
  406. return
  407. else
  408. error("Unknown command : " .. command .. "\n" .. traceback(co))
  409. end
  410. end
  411. local co_create_for_timeout
  412. local timeout_traceback
  413. function skynet.trace_timeout(on)
  414. local function trace_coroutine(func, ti)
  415. local co
  416. co = co_create(function()
  417. timeout_traceback[co] = nil
  418. func()
  419. end)
  420. local info = string.format("TIMER %d+%d : ", skynet.now(), ti)
  421. timeout_traceback[co] = traceback(info, 3)
  422. return co
  423. end
  424. if on then
  425. timeout_traceback = timeout_traceback or {}
  426. co_create_for_timeout = trace_coroutine
  427. else
  428. timeout_traceback = nil
  429. co_create_for_timeout = co_create
  430. end
  431. end
  432. skynet.trace_timeout(false) -- turn off by default
  433. function skynet.timeout(ti, func)
  434. local session = auxtimeout(ti)
  435. assert(session)
  436. local co = co_create_for_timeout(func, ti)
  437. assert(session_id_coroutine[session] == nil)
  438. session_id_coroutine[session] = co
  439. return co -- for debug
  440. end
  441. local function suspend_sleep(session, token)
  442. local tag = session_coroutine_tracetag[running_thread]
  443. if tag then c.trace(tag, "sleep", 2) end
  444. session_id_coroutine[session] = running_thread
  445. assert(sleep_session[token] == nil, "token duplicative")
  446. sleep_session[token] = session
  447. return coroutine_yield "SUSPEND"
  448. end
  449. function skynet.sleep(ti, token)
  450. local session = auxtimeout(ti)
  451. assert(session)
  452. token = token or coroutine.running()
  453. local succ, ret = suspend_sleep(session, token)
  454. sleep_session[token] = nil
  455. if succ then
  456. return
  457. end
  458. if ret == "BREAK" then
  459. return "BREAK"
  460. else
  461. error(ret)
  462. end
  463. end
  464. function skynet.yield()
  465. return skynet.sleep(0)
  466. end
  467. function skynet.wait(token)
  468. local session = c.genid()
  469. token = token or coroutine.running()
  470. suspend_sleep(session, token)
  471. sleep_session[token] = nil
  472. session_id_coroutine[session] = nil
  473. end
  474. function skynet.killthread(thread)
  475. local session
  476. -- find session
  477. if type(thread) == "string" then
  478. for k,v in pairs(session_id_coroutine) do
  479. local thread_string = tostring(v)
  480. if thread_string:find(thread) then
  481. session = k
  482. break
  483. end
  484. end
  485. else
  486. local t = fork_queue.t
  487. for i = fork_queue.h, t do
  488. if fork_queue[i] == thread then
  489. table.move(fork_queue, i+1, t, i)
  490. fork_queue[t] = nil
  491. fork_queue.t = t - 1
  492. return thread
  493. end
  494. end
  495. for k,v in pairs(session_id_coroutine) do
  496. if v == thread then
  497. session = k
  498. break
  499. end
  500. end
  501. end
  502. local co = session_id_coroutine[session]
  503. if co == nil then
  504. return
  505. end
  506. local addr = session_coroutine_address[co]
  507. if addr then
  508. session_coroutine_address[co] = nil
  509. session_coroutine_tracetag[co] = nil
  510. local session = session_coroutine_id[co]
  511. if session > 0 then
  512. c.send(addr, skynet.PTYPE_ERROR, session, "")
  513. end
  514. session_coroutine_id[co] = nil
  515. end
  516. if watching_session[session] then
  517. session_id_coroutine[session] = "BREAK"
  518. watching_session[session] = nil
  519. else
  520. session_id_coroutine[session] = nil
  521. end
  522. for k,v in pairs(sleep_session) do
  523. if v == session then
  524. sleep_session[k] = nil
  525. break
  526. end
  527. end
  528. coroutine.close(co)
  529. return co
  530. end
  531. function skynet.self()
  532. return c.addresscommand "REG"
  533. end
  534. function skynet.localname(name)
  535. return c.addresscommand("QUERY", name)
  536. end
  537. skynet.now = c.now
  538. skynet.hpc = c.hpc -- high performance counter
  539. local traceid = 0
  540. function skynet.trace(info)
  541. skynet.error("TRACE", session_coroutine_tracetag[running_thread])
  542. if session_coroutine_tracetag[running_thread] == false then
  543. -- force off trace log
  544. return
  545. end
  546. traceid = traceid + 1
  547. local tag = string.format(":%08x-%d",skynet.self(), traceid)
  548. session_coroutine_tracetag[running_thread] = tag
  549. if info then
  550. c.trace(tag, "trace " .. info)
  551. else
  552. c.trace(tag, "trace")
  553. end
  554. end
  555. function skynet.tracetag()
  556. return session_coroutine_tracetag[running_thread]
  557. end
  558. local starttime
  559. function skynet.starttime()
  560. if not starttime then
  561. starttime = c.intcommand("STARTTIME")
  562. end
  563. return starttime
  564. end
  565. function skynet.time()
  566. return skynet.now()/100 + (starttime or skynet.starttime())
  567. end
  568. function skynet.exit()
  569. fork_queue = { h = 1, t = 0 } -- no fork coroutine can be execute after skynet.exit
  570. skynet.send(".launcher","lua","REMOVE",skynet.self(), false)
  571. -- report the sources that call me
  572. for co, session in pairs(session_coroutine_id) do
  573. local address = session_coroutine_address[co]
  574. if session~=0 and address then
  575. c.send(address, skynet.PTYPE_ERROR, session, "")
  576. end
  577. end
  578. for session, co in pairs(session_id_coroutine) do
  579. if type(co) == "thread" and co ~= running_thread then
  580. coroutine.close(co)
  581. end
  582. end
  583. for resp in pairs(unresponse) do
  584. resp(false)
  585. end
  586. -- report the sources I call but haven't return
  587. local tmp = {}
  588. for session, address in pairs(watching_session) do
  589. tmp[address] = true
  590. end
  591. for address in pairs(tmp) do
  592. c.send(address, skynet.PTYPE_ERROR, 0, "")
  593. end
  594. c.callback(function(prototype, msg, sz, session, source)
  595. c.send(source, skynet.PTYPE_ERROR, session, "")
  596. end)
  597. c.command("EXIT")
  598. -- quit service
  599. coroutine_yield "QUIT"
  600. end
  601. function skynet.getenv(key)
  602. return (c.command("GETENV",key))
  603. end
  604. function skynet.setenv(key, value)
  605. assert(c.command("GETENV",key) == nil, "Can't setenv exist key : " .. key)
  606. c.command("SETENV",key .. " " ..value)
  607. end
  608. function skynet.send(addr, typename, ...)
  609. local p = proto[typename]
  610. return c.send(addr, p.id, 0 , p.pack(...))
  611. end
  612. function skynet.rawsend(addr, typename, msg, sz)
  613. local p = proto[typename]
  614. return c.send(addr, p.id, 0 , msg, sz)
  615. end
  616. skynet.genid = assert(c.genid)
  617. skynet.redirect = function(dest,source,typename,...)
  618. return c.redirect(dest, source, proto[typename].id, ...)
  619. end
  620. skynet.pack = assert(c.pack)
  621. skynet.packstring = assert(c.packstring)
  622. skynet.unpack = assert(c.unpack)
  623. skynet.tostring = assert(c.tostring)
  624. skynet.trash = assert(c.trash)
  625. local function yield_call(service, session)
  626. watching_session[session] = service
  627. session_id_coroutine[session] = running_thread
  628. local succ, msg, sz = coroutine_yield "SUSPEND"
  629. watching_session[session] = nil
  630. if not succ then
  631. error "call failed"
  632. end
  633. return msg,sz
  634. end
  635. function skynet.call(addr, typename, ...)
  636. local tag = session_coroutine_tracetag[running_thread]
  637. if tag then
  638. c.trace(tag, "call", 2)
  639. c.send(addr, skynet.PTYPE_TRACE, 0, tag)
  640. end
  641. local p = proto[typename]
  642. local session = auxsend(addr, p.id , p.pack(...))
  643. if session == nil then
  644. error("call to invalid address " .. skynet.address(addr))
  645. end
  646. return p.unpack(yield_call(addr, session))
  647. end
  648. function skynet.rawcall(addr, typename, msg, sz)
  649. local tag = session_coroutine_tracetag[running_thread]
  650. if tag then
  651. c.trace(tag, "call", 2)
  652. c.send(addr, skynet.PTYPE_TRACE, 0, tag)
  653. end
  654. local p = proto[typename]
  655. local session = assert(auxsend(addr, p.id , msg, sz), "call to invalid address")
  656. return yield_call(addr, session)
  657. end
  658. function skynet.tracecall(tag, addr, typename, msg, sz)
  659. c.trace(tag, "tracecall begin")
  660. c.send(addr, skynet.PTYPE_TRACE, 0, tag)
  661. local p = proto[typename]
  662. local session = assert(auxsend(addr, p.id , msg, sz), "call to invalid address")
  663. local msg, sz = yield_call(addr, session)
  664. c.trace(tag, "tracecall end")
  665. return msg, sz
  666. end
  667. function skynet.ret(msg, sz)
  668. msg = msg or ""
  669. local tag = session_coroutine_tracetag[running_thread]
  670. if tag then c.trace(tag, "response") end
  671. local co_session = session_coroutine_id[running_thread]
  672. if co_session == nil then
  673. error "No session"
  674. end
  675. session_coroutine_id[running_thread] = nil
  676. if co_session == 0 then
  677. if sz ~= nil then
  678. c.trash(msg, sz)
  679. end
  680. return false -- send don't need ret
  681. end
  682. local co_address = session_coroutine_address[running_thread]
  683. local ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, msg, sz)
  684. if ret then
  685. return true
  686. elseif ret == false then
  687. -- If the package is too large, returns false. so we should report error back
  688. c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
  689. end
  690. return false
  691. end
  692. function skynet.context()
  693. local co_session = session_coroutine_id[running_thread]
  694. local co_address = session_coroutine_address[running_thread]
  695. return co_session, co_address
  696. end
  697. function skynet.ignoreret()
  698. -- We use session for other uses
  699. session_coroutine_id[running_thread] = nil
  700. end
  701. function skynet.response(pack)
  702. pack = pack or skynet.pack
  703. local co_session = assert(session_coroutine_id[running_thread], "no session")
  704. session_coroutine_id[running_thread] = nil
  705. local co_address = session_coroutine_address[running_thread]
  706. if co_session == 0 then
  707. -- do not response when session == 0 (send)
  708. return function() end
  709. end
  710. local function response(ok, ...)
  711. if ok == "TEST" then
  712. return unresponse[response] ~= nil
  713. end
  714. if not pack then
  715. error "Can't response more than once"
  716. end
  717. local ret
  718. if unresponse[response] then
  719. if ok then
  720. ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, pack(...))
  721. if ret == false then
  722. -- If the package is too large, returns false. so we should report error back
  723. c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
  724. end
  725. else
  726. ret = c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
  727. end
  728. unresponse[response] = nil
  729. ret = ret ~= nil
  730. else
  731. ret = false
  732. end
  733. pack = nil
  734. return ret
  735. end
  736. unresponse[response] = co_address
  737. return response
  738. end
  739. function skynet.retpack(...)
  740. return skynet.ret(skynet.pack(...))
  741. end
  742. function skynet.wakeup(token)
  743. if sleep_session[token] then
  744. tinsert(wakeup_queue, token)
  745. return true
  746. end
  747. end
  748. function skynet.dispatch(typename, func)
  749. local p = proto[typename]
  750. if func then
  751. local ret = p.dispatch
  752. p.dispatch = func
  753. return ret
  754. else
  755. return p and p.dispatch
  756. end
  757. end
  758. local function unknown_request(session, address, msg, sz, prototype)
  759. skynet.error(string.format("Unknown request (%s): %s", prototype, c.tostring(msg,sz)))
  760. error(string.format("Unknown session : %d from %x", session, address))
  761. end
  762. function skynet.dispatch_unknown_request(unknown)
  763. local prev = unknown_request
  764. unknown_request = unknown
  765. return prev
  766. end
  767. local function unknown_response(session, address, msg, sz)
  768. skynet.error(string.format("Response message : %s" , c.tostring(msg,sz)))
  769. error(string.format("Unknown session : %d from %x", session, address))
  770. end
  771. function skynet.dispatch_unknown_response(unknown)
  772. local prev = unknown_response
  773. unknown_response = unknown
  774. return prev
  775. end
  776. function skynet.fork(func,...)
  777. local n = select("#", ...)
  778. local co
  779. if n == 0 then
  780. co = co_create(func)
  781. else
  782. local args = { ... }
  783. co = co_create(function() func(table.unpack(args,1,n)) end)
  784. end
  785. local t = fork_queue.t + 1
  786. fork_queue.t = t
  787. fork_queue[t] = co
  788. return co
  789. end
  790. local trace_source = {}
  791. local function raw_dispatch_message(prototype, msg, sz, session, source)
  792. -- skynet.PTYPE_RESPONSE = 1, read skynet.h
  793. if prototype == 1 then
  794. local co = session_id_coroutine[session]
  795. if co == "BREAK" then
  796. session_id_coroutine[session] = nil
  797. elseif co == nil then
  798. unknown_response(session, source, msg, sz)
  799. else
  800. local tag = session_coroutine_tracetag[co]
  801. if tag then c.trace(tag, "resume") end
  802. session_id_coroutine[session] = nil
  803. suspend(co, coroutine_resume(co, true, msg, sz, session))
  804. end
  805. else
  806. local p = proto[prototype]
  807. if p == nil then
  808. if prototype == skynet.PTYPE_TRACE then
  809. -- trace next request
  810. trace_source[source] = c.tostring(msg,sz)
  811. elseif session ~= 0 then
  812. c.send(source, skynet.PTYPE_ERROR, session, "")
  813. else
  814. unknown_request(session, source, msg, sz, prototype)
  815. end
  816. return
  817. end
  818. local f = p.dispatch
  819. if f then
  820. local co = co_create(f)
  821. session_coroutine_id[co] = session
  822. session_coroutine_address[co] = source
  823. local traceflag = p.trace
  824. if traceflag == false then
  825. -- force off
  826. trace_source[source] = nil
  827. session_coroutine_tracetag[co] = false
  828. else
  829. local tag = trace_source[source]
  830. if tag then
  831. trace_source[source] = nil
  832. c.trace(tag, "request")
  833. session_coroutine_tracetag[co] = tag
  834. elseif traceflag then
  835. -- set running_thread for trace
  836. running_thread = co
  837. skynet.trace()
  838. end
  839. end
  840. suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
  841. else
  842. trace_source[source] = nil
  843. if session ~= 0 then
  844. c.send(source, skynet.PTYPE_ERROR, session, "")
  845. else
  846. unknown_request(session, source, msg, sz, proto[prototype].name)
  847. end
  848. end
  849. end
  850. end
  851. function skynet.dispatch_message(...)
  852. local succ, err = pcall(raw_dispatch_message,...)
  853. while true do
  854. if fork_queue.h > fork_queue.t then
  855. -- queue is empty
  856. fork_queue.h = 1
  857. fork_queue.t = 0
  858. break
  859. end
  860. -- pop queue
  861. local h = fork_queue.h
  862. local co = fork_queue[h]
  863. fork_queue[h] = nil
  864. fork_queue.h = h + 1
  865. local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
  866. if not fork_succ then
  867. if succ then
  868. succ = false
  869. err = tostring(fork_err)
  870. else
  871. err = tostring(err) .. "\n" .. tostring(fork_err)
  872. end
  873. end
  874. end
  875. assert(succ, tostring(err))
  876. end
  877. function skynet.newservice(name, ...)
  878. return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
  879. end
  880. function skynet.uniqueservice(global, ...)
  881. if global == true then
  882. return assert(skynet.call(".service", "lua", "GLAUNCH", ...))
  883. else
  884. return assert(skynet.call(".service", "lua", "LAUNCH", global, ...))
  885. end
  886. end
  887. function skynet.queryservice(global, ...)
  888. if global == true then
  889. return assert(skynet.call(".service", "lua", "GQUERY", ...))
  890. else
  891. return assert(skynet.call(".service", "lua", "QUERY", global, ...))
  892. end
  893. end
  894. function skynet.address(addr)
  895. if type(addr) == "number" then
  896. return string.format(":%08x",addr)
  897. else
  898. return tostring(addr)
  899. end
  900. end
  901. function skynet.harbor(addr)
  902. return c.harbor(addr)
  903. end
  904. skynet.error = c.error
  905. skynet.tracelog = c.trace
  906. -- true: force on
  907. -- false: force off
  908. -- nil: optional (use skynet.trace() to trace one message)
  909. function skynet.traceproto(prototype, flag)
  910. local p = assert(proto[prototype])
  911. p.trace = flag
  912. end
  913. ----- register protocol
  914. do
  915. local REG = skynet.register_protocol
  916. REG {
  917. name = "lua",
  918. id = skynet.PTYPE_LUA,
  919. pack = skynet.pack,
  920. unpack = skynet.unpack,
  921. }
  922. REG {
  923. name = "response",
  924. id = skynet.PTYPE_RESPONSE,
  925. }
  926. REG {
  927. name = "error",
  928. id = skynet.PTYPE_ERROR,
  929. unpack = function(...) return ... end,
  930. dispatch = _error_dispatch,
  931. }
  932. end
  933. skynet.init = skynet_require.init
  934. -- skynet.pcall is deprecated, use pcall directly
  935. skynet.pcall = pcall
  936. function skynet.init_service(start)
  937. local function main()
  938. skynet_require.init_all()
  939. start()
  940. end
  941. local ok, err = xpcall(main, traceback)
  942. if not ok then
  943. skynet.error("init service failed: " .. tostring(err))
  944. skynet.send(".launcher","lua", "ERROR")
  945. skynet.exit()
  946. else
  947. skynet.send(".launcher","lua", "LAUNCHOK")
  948. end
  949. end
  950. function skynet.start(start_func)
  951. c.callback(skynet.dispatch_message)
  952. init_thread = skynet.timeout(0, function()
  953. skynet.init_service(start_func)
  954. init_thread = nil
  955. end)
  956. end
  957. function skynet.endless()
  958. return (c.intcommand("STAT", "endless") == 1)
  959. end
  960. function skynet.mqlen()
  961. return c.intcommand("STAT", "mqlen")
  962. end
  963. function skynet.stat(what)
  964. return c.intcommand("STAT", what)
  965. end
  966. function skynet.task(ret)
  967. if ret == nil then
  968. local t = 0
  969. for session,co in pairs(session_id_coroutine) do
  970. if co ~= "BREAK" then
  971. t = t + 1
  972. end
  973. end
  974. return t
  975. end
  976. if ret == "init" then
  977. if init_thread then
  978. return traceback(init_thread)
  979. else
  980. return
  981. end
  982. end
  983. local tt = type(ret)
  984. if tt == "table" then
  985. for session,co in pairs(session_id_coroutine) do
  986. local key = string.format("%s session: %d", tostring(co), session)
  987. if co == "BREAK" then
  988. ret[key] = "BREAK"
  989. elseif timeout_traceback and timeout_traceback[co] then
  990. ret[key] = timeout_traceback[co]
  991. else
  992. ret[key] = traceback(co)
  993. end
  994. end
  995. return
  996. elseif tt == "number" then
  997. local co = session_id_coroutine[ret]
  998. if co then
  999. if co == "BREAK" then
  1000. return "BREAK"
  1001. else
  1002. return traceback(co)
  1003. end
  1004. else
  1005. return "No session"
  1006. end
  1007. elseif tt == "thread" then
  1008. for session, co in pairs(session_id_coroutine) do
  1009. if co == ret then
  1010. return session
  1011. end
  1012. end
  1013. return
  1014. end
  1015. end
  1016. function skynet.uniqtask()
  1017. local stacks = {}
  1018. for session, co in pairs(session_id_coroutine) do
  1019. local stack = traceback(co)
  1020. local info = stacks[stack] or {count = 0, sessions = {}}
  1021. info.count = info.count + 1
  1022. if info.count < 10 then
  1023. info.sessions[#info.sessions+1] = session
  1024. end
  1025. stacks[stack] = info
  1026. end
  1027. local ret = {}
  1028. for stack, info in pairs(stacks) do
  1029. local count = info.count
  1030. local sessions = table.concat(info.sessions, ",")
  1031. if count > 10 then
  1032. sessions = sessions .. "..."
  1033. end
  1034. local head_line = string.format("%d\tsessions:[%s]\n", count, sessions)
  1035. ret[head_line] = stack
  1036. end
  1037. return ret
  1038. end
  1039. function skynet.term(service)
  1040. return _error_dispatch(0, service)
  1041. end
  1042. function skynet.memlimit(bytes)
  1043. debug.getregistry().memlimit = bytes
  1044. skynet.memlimit = nil -- set only once
  1045. end
  1046. -- Inject internal debug framework
  1047. local debug = require "skynet.debug"
  1048. debug.init(skynet, {
  1049. dispatch = skynet.dispatch_message,
  1050. suspend = suspend,
  1051. resume = coroutine_resume,
  1052. })
  1053. return skynet