437 lines
14 KiB
Python
437 lines
14 KiB
Python
# vim:set et sw=4 ts=8:
|
|
|
|
import json
|
|
import socket
|
|
import sys
|
|
import os
|
|
import threading
|
|
import vim
|
|
import logging
|
|
import msgpack
|
|
import neovim_rpc_server_api_info
|
|
import neovim_rpc_methods
|
|
import threading
|
|
import socket
|
|
import time
|
|
import subprocess
|
|
from neovim.api import common as neovim_common
|
|
import neovim_rpc_protocol
|
|
|
|
vim_error = vim.Function('neovim_rpc#_error')
|
|
|
|
# protable devnull
|
|
if sys.version_info.major==2:
|
|
DEVNULL = open(os.devnull, 'wb')
|
|
else:
|
|
from subprocess import DEVNULL
|
|
|
|
|
|
if sys.version_info.major == 2:
|
|
from Queue import Queue, Empty as QueueEmpty
|
|
else:
|
|
from queue import Queue, Empty as QueueEmpty
|
|
|
|
# NVIM_PYTHON_LOG_FILE=nvim.log NVIM_PYTHON_LOG_LEVEL=INFO vim test.md
|
|
|
|
try:
|
|
# Python 3
|
|
import socketserver
|
|
except ImportError:
|
|
# Python 2
|
|
import SocketServer as socketserver
|
|
|
|
# globals
|
|
logger = logging.getLogger(__name__)
|
|
# supress the annoying error message:
|
|
# No handlers could be found for logger "neovim_rpc_server"
|
|
logger.addHandler(logging.NullHandler())
|
|
|
|
request_queue = Queue()
|
|
responses = {}
|
|
|
|
def _channel_id_new():
|
|
with _channel_id_new._lock:
|
|
_channel_id_new._counter += 1
|
|
return _channel_id_new._counter
|
|
# static local
|
|
_channel_id_new._counter = 0
|
|
_channel_id_new._lock = threading.Lock()
|
|
|
|
|
|
class VimHandler(socketserver.BaseRequestHandler):
|
|
|
|
_lock = threading.Lock()
|
|
_sock = None
|
|
|
|
@classmethod
|
|
def notify(cls,cmd="call neovim_rpc#_callback()"):
|
|
try:
|
|
if not VimHandler._sock:
|
|
return
|
|
with VimHandler._lock:
|
|
encoded = json.dumps(['ex', cmd])
|
|
logger.info("sending notification: %s",encoded)
|
|
VimHandler._sock.send(encoded.encode('utf-8'))
|
|
except Exception as ex:
|
|
logger.exception('VimHandler notify exception for [%s]: %s', cmd, ex)
|
|
|
|
@classmethod
|
|
def notify_exited(cls,channel):
|
|
try:
|
|
cmd = "call neovim_rpc#_on_exit(%s)" % channel
|
|
cls.notify(cmd)
|
|
except Exception as ex:
|
|
logger.exception('notify_exited for channel [%s] exception: %s',channel,ex)
|
|
|
|
# each connection is a thread
|
|
def handle(self):
|
|
logger.info("=== socket opened ===")
|
|
data = None
|
|
while True:
|
|
try:
|
|
rcv = self.request.recv(4096)
|
|
# 16k buffer by default
|
|
if data:
|
|
data += rcv
|
|
else:
|
|
data = rcv
|
|
except socket.error:
|
|
logger.info("=== socket error ===")
|
|
break
|
|
except IOError:
|
|
logger.info("=== socket closed ===")
|
|
break
|
|
if len(rcv) == 0:
|
|
logger.info("=== socket closed ===")
|
|
break
|
|
logger.info("received: %s", data)
|
|
try:
|
|
decoded = json.loads(data.decode('utf-8'))
|
|
except ValueError:
|
|
logger.exception("json decoding failed, wait for more data")
|
|
continue
|
|
data = None
|
|
|
|
# Send a response if the sequence number is positive.
|
|
# Negative numbers are used for "eval" responses.
|
|
if len(decoded)>=2 and decoded[0] >= 0 and decoded[1] == 'neovim_rpc_setup':
|
|
|
|
VimHandler._sock = self.request
|
|
|
|
# initial setup
|
|
encoded = json.dumps(['ex', "scall neovim_rpc#_callback()"])
|
|
logger.info("sending {0}".format(encoded))
|
|
self.request.send(encoded.encode('utf-8'))
|
|
|
|
else:
|
|
# recognize as rpcrequest
|
|
reqid = decoded[0]
|
|
channel = decoded[1][1]
|
|
event = decoded[1][2]
|
|
args = decoded[1][3]
|
|
rspid = decoded[1][4]
|
|
NvimHandler.request(self.request,
|
|
channel,
|
|
reqid,
|
|
event,
|
|
args,
|
|
rspid)
|
|
|
|
# wait for response
|
|
|
|
class SocketToStream():
|
|
|
|
def __init__(self,sock):
|
|
self._sock = sock
|
|
|
|
def read(self,cnt):
|
|
if cnt>4096:
|
|
cnt = 4096
|
|
return self._sock.recv(cnt)
|
|
|
|
def write(self,w):
|
|
return self._sock.send(w)
|
|
|
|
class NvimHandler(socketserver.BaseRequestHandler):
|
|
|
|
channel_sockets = {}
|
|
|
|
def handle(self):
|
|
|
|
logger.info("=== socket opened for client ===")
|
|
|
|
channel = _channel_id_new()
|
|
|
|
sock = self.request
|
|
chinfo = dict(sock=sock)
|
|
NvimHandler.channel_sockets[channel] = chinfo
|
|
|
|
try:
|
|
f = SocketToStream(sock)
|
|
unpacker = msgpack.Unpacker(f)
|
|
for unpacked in unpacker:
|
|
logger.info("unpacked: %s", unpacked)
|
|
|
|
# response format:
|
|
# - msg[0]: 1
|
|
# - msg[1]: the request id
|
|
# - msg[2]: error(if any), format: [code,str]
|
|
# - msg[3]: result(if not errored)
|
|
if int(unpacked[0]) == 1:
|
|
unpacked = neovim_rpc_protocol.from_client(unpacked)
|
|
reqid = int(unpacked[1])
|
|
rspid, vimsock = chinfo[reqid]
|
|
err = unpacked[2]
|
|
result = unpacked[3]
|
|
# VIM fails to parse response when there a sleep in neovim
|
|
# client. I cannot figure out why. Use global responses to
|
|
# workaround this issue.
|
|
responses[rspid] = [err, result]
|
|
content = [reqid, '']
|
|
tosend = json.dumps(content)
|
|
# vimsock.send
|
|
vimsock.send(tosend.encode('utf-8'))
|
|
chinfo.pop(reqid)
|
|
continue
|
|
|
|
|
|
request_queue.put((f,channel,unpacked))
|
|
# notify vim in order to process request in main thread, and
|
|
# avoiding the stupid json protocol
|
|
VimHandler.notify()
|
|
|
|
logger.info('channel %s closed.', channel)
|
|
|
|
except:
|
|
logger.exception('unpacker failed.')
|
|
finally:
|
|
try:
|
|
NvimHandler.channel_sockets.pop(channel)
|
|
sock.close()
|
|
except:
|
|
pass
|
|
|
|
@classmethod
|
|
def notify(cls,channel,event,args):
|
|
try:
|
|
channel = int(channel)
|
|
if channel not in cls.channel_sockets:
|
|
logger.info("channel[%s] not in NvimHandler", channel)
|
|
return
|
|
sock = cls.channel_sockets[channel]['sock']
|
|
|
|
# notification format:
|
|
# - msg[0] type, which is 2
|
|
# - msg[1] method
|
|
# - msg[2] arguments
|
|
content = [2, event, args]
|
|
|
|
logger.info("notify channel[%s]: %s", channel, content)
|
|
packed = msgpack.packb(neovim_rpc_protocol.to_client(content))
|
|
sock.send(packed)
|
|
except Exception as ex:
|
|
logger.exception("notify failed: %s", ex)
|
|
|
|
@classmethod
|
|
def request(cls, vimsock, channel, reqid, event, args, rspid):
|
|
try:
|
|
reqid = int(reqid)
|
|
channel = int(channel)
|
|
chinfo = cls.channel_sockets[channel]
|
|
|
|
if channel not in cls.channel_sockets:
|
|
logger.info("channel[%s] not in NvimHandler", channel)
|
|
return
|
|
|
|
sock = chinfo['sock']
|
|
# request format:
|
|
# - msg[0] type, which is 0
|
|
# - msg[1] request id
|
|
# - msg[2] method
|
|
# - msg[3] arguments
|
|
content = [0, reqid, event, args]
|
|
|
|
chinfo[reqid] = [rspid, vimsock]
|
|
|
|
logger.info("request channel[%s]: %s", channel, content)
|
|
packed = msgpack.packb(neovim_rpc_protocol.to_client(content))
|
|
sock.send(packed)
|
|
except Exception as ex:
|
|
logger.exception("request failed: %s", ex)
|
|
|
|
@classmethod
|
|
def shutdown(cls):
|
|
# close all sockets
|
|
for channel in list(cls.channel_sockets.keys()):
|
|
chinfo = cls.channel_sockets.get(channel,None)
|
|
if chinfo:
|
|
sock = chinfo['sock']
|
|
logger.info("closing client %s", channel)
|
|
# if don't shutdown the socket, vim will never exit because the
|
|
# recv thread is still blocking
|
|
sock.shutdown(socket.SHUT_RDWR)
|
|
sock.close()
|
|
|
|
|
|
# copied from neovim python-client/neovim/__init__.py
|
|
def _setup_logging(name):
|
|
"""Setup logging according to environment variables."""
|
|
logger = logging.getLogger(__name__)
|
|
if 'NVIM_PYTHON_LOG_FILE' in os.environ:
|
|
prefix = os.environ['NVIM_PYTHON_LOG_FILE'].strip()
|
|
major_version = sys.version_info[0]
|
|
logfile = '{}_py{}_{}'.format(prefix, major_version, name)
|
|
handler = logging.FileHandler(logfile, 'w', encoding='utf-8')
|
|
handler.formatter = logging.Formatter(
|
|
'%(asctime)s [%(levelname)s @ '
|
|
'%(filename)s:%(funcName)s:%(lineno)s] %(process)s - %(message)s')
|
|
logging.root.addHandler(handler)
|
|
level = logging.INFO
|
|
if 'NVIM_PYTHON_LOG_LEVEL' in os.environ:
|
|
l = getattr(logging,
|
|
os.environ['NVIM_PYTHON_LOG_LEVEL'].strip(),
|
|
level)
|
|
if isinstance(l, int):
|
|
level = l
|
|
logger.setLevel(level)
|
|
|
|
def start():
|
|
|
|
_setup_logging('neovim_rpc_server')
|
|
|
|
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
|
pass
|
|
|
|
# 0 for random port
|
|
global _vim_server
|
|
global _nvim_server
|
|
_vim_server = ThreadedTCPServer(("127.0.0.1", 0), VimHandler)
|
|
_nvim_server = ThreadedTCPServer(("127.0.0.1", 0), NvimHandler)
|
|
_vim_server.daemon_threads = True
|
|
_nvim_server.daemon_threads = True
|
|
|
|
# Start a thread with the server -- that thread will then start one
|
|
# more thread for each request
|
|
main_server_thread = threading.Thread(target=_vim_server.serve_forever)
|
|
clients_server_thread = threading.Thread(target=_nvim_server.serve_forever)
|
|
|
|
# Exit the server thread when the main thread terminates
|
|
main_server_thread.daemon = True
|
|
main_server_thread.start()
|
|
clients_server_thread.daemon = True
|
|
clients_server_thread.start()
|
|
|
|
return ["{addr[0]}:{addr[1]}".format(addr=_nvim_server.server_address), "{addr[0]}:{addr[1]}".format(addr=_vim_server.server_address)]
|
|
|
|
def process_pending_requests():
|
|
|
|
logger.info("process_pending_requests")
|
|
while True:
|
|
|
|
item = None
|
|
try:
|
|
|
|
item = request_queue.get(False)
|
|
|
|
f, channel, msg = item
|
|
|
|
msg = neovim_rpc_protocol.from_client(msg)
|
|
|
|
logger.info("get msg from channel [%s]: %s", channel, msg)
|
|
|
|
# request format:
|
|
# - msg[0] type, which is 0
|
|
# - msg[1] request id
|
|
# - msg[2] method
|
|
# - msg[3] arguments
|
|
|
|
# notification format:
|
|
# - msg[0] type, which is 2
|
|
# - msg[1] method
|
|
# - msg[2] arguments
|
|
|
|
if msg[0] == 0:
|
|
#request
|
|
|
|
req_typed, req_id, method, args = msg
|
|
|
|
try:
|
|
err=None
|
|
result = _process_request(channel,method,args)
|
|
except Exception as ex:
|
|
logger.exception("process failed: %s", ex)
|
|
# error uccor
|
|
err = [1,str(ex)]
|
|
result = None
|
|
|
|
result = [1,req_id,err,result]
|
|
logger.info("sending result: %s", result)
|
|
packed = msgpack.packb(neovim_rpc_protocol.to_client(result))
|
|
f.write(packed)
|
|
logger.info("sent")
|
|
if msg[0] == 2:
|
|
# notification
|
|
req_typed, method, args = msg
|
|
try:
|
|
result = _process_request(channel,method,args)
|
|
logger.info('notification process result: [%s]', result)
|
|
except Exception as ex:
|
|
logger.exception("process failed: %s", ex)
|
|
|
|
except QueueEmpty as em:
|
|
pass
|
|
except Exception as ex:
|
|
logger.exception("exception during process: %s", ex)
|
|
finally:
|
|
if item:
|
|
request_queue.task_done()
|
|
else:
|
|
# item==None means the queue is empty
|
|
break
|
|
|
|
def _process_request(channel,method,args):
|
|
if method=='vim_get_api_info':
|
|
# this is the first request send from neovim client
|
|
api_info = neovim_rpc_server_api_info.API_INFO
|
|
return [channel,api_info]
|
|
if hasattr(neovim_rpc_methods,method):
|
|
return getattr(neovim_rpc_methods,method)(*args)
|
|
else:
|
|
logger.error("method %s not implemented", method)
|
|
vim_error("rpc method [%s] not implemented in pythonx/neovim_rpc_methods.py. Please send PR or contact the mantainer." % method)
|
|
raise Exception('%s not implemented' % method)
|
|
|
|
def rpcnotify(channel,method,args):
|
|
NvimHandler.notify(channel,method,args)
|
|
|
|
|
|
def stop():
|
|
|
|
logger.info("stop begin")
|
|
|
|
# close tcp channel server
|
|
_nvim_server.shutdown()
|
|
_nvim_server.server_close()
|
|
|
|
# close the main channel
|
|
try:
|
|
vim.command('call ch_close(g:_neovim_rpc_main_channel)')
|
|
except Exception as ex:
|
|
logger.info("ch_close failed: %s", ex)
|
|
|
|
# remove all sockets
|
|
NvimHandler.shutdown()
|
|
|
|
try:
|
|
# stop the main channel
|
|
_vim_server.shutdown()
|
|
except Exception as ex:
|
|
logger.info("_vim_server shutodwn failed: %s", ex)
|
|
|
|
try:
|
|
_vim_server.server_close()
|
|
except Exception as ex:
|
|
logger.info("_vim_server close failed: %s", ex)
|
|
|
|
|