Python用内置模块来构建REST服务、RPC服务
liuian 2024-11-28 00:46 21 浏览
1写在前面
- 和小伙伴们分享一些Python 网络编程的一些笔记,博文为《Python Cookbook》读书后笔记整理
- 博文涉及内容包括:
- TCP/UDP服务构建
- 不使用框架创建一个REST风格的HTTP服务
- 基于XML-RPC实现简单的RPC
- 基于multiprocessing实现简单的RPC
- python实现作为客户端与HTTP服务交互
- 理解不足小伙伴帮忙指正
傍晚时分,坐在屋檐下,看着天慢慢地黑下去,心里寂寞而凄凉,感到自己的生命被剥夺了。当时我是个年轻人,但我害怕这样生活下去,衰老下去。在我看来,这是比死亡更可怕的事。--------王小波
在Python中,构建一个静态Web服务器,只需要 python3 -m http.server 端口号( 端口号不指定默认是8000) 这一条命令就可以搞定了,之前也有看到有公司内网中,一些安装包放到服务器上每次FTP麻烦,用http模块的方式很方便。
python在网络方面封装一些内置模块,可以用很简洁的代码实现端到端的通信,比如HTTP、RPC服务等。
在编写RPC和REST服务之前,先来温习一下常见的的基于Socket模块的一些端到端的通信协议。不管是RPC还是REST都需要底层的通信协议来支持。
对于TCP和UPD协议,在常见的网络通信中,浏览器,邮件等一般应用程序在收发数据时都是通过TCP协议的,DNS等收发较短的控制数据时一般会使用UDP。
创建TCP服务
实现一个服务器,通过 TCP 协议和客户端通信。
创建一个 TCP 服务器的一个简单方法是使用socketserver库。一起来温习下面这个简单的TCP服务器
from socketserver import BaseRequestHandler, TCPServer
class EchoHandler(BaseRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
while True:
#接收客户端发送的数据, 这次接收数据的最大字节数是8192
msg = self.request.recv(8192)
# 接收的到数据在发送回去
if not msg:
break
self.request.send(msg)
if __name__ == '__main__':
# 20000端口,默认IP为本地IP,监听到消息交个EchoHandler处理器
serv = TCPServer(('', 20000), EchoHandler)
serv.serve_forever()
代码很简单,指定IP暴露对应的端口,这里通过serv.serve_forever()来保证连接一直存在。 Got connection from ('127.0.0.1', 1675)
建立好服务端之后看下客户端
- AF_INET:表示ipv4
- SOCK_STREAM: tcp传输协议
>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect(('localhost', 20000))
>>> s.send(b'Hello')
5
>>> s.recv(8192)
b'Hello'
>>>
socketserver 默认情况下这种服务器是单线程的,一次只能为一个客户端连接服务。如果想通过多线程处理多个客户端,可以初始化一个ForkingTCPServer 或者是ThreadingTCPServer对象。
from socketserver import ThreadingTCPServer
if __name__ == '__main__':
serv = ThreadingTCPServer(('', 20000), EchoHandler)
serv.serve_forever()
使用 fork 或线程服务器有个潜在问题就是它们会为每个客户端连接创建一个新的进程或线程。由于客户端连接数是没有限制的,因此一个恶意的黑客可以同时发送大量的连接让的服务器奔溃。
可以创建一个预先分配大小的 工作线程池或进程池,先创建一个普通的非线程服务器,然后在一个线程池中使用serve forever()方法来启动它们。
if __name__ == '__main__':
from threading import Thread
NWORKERS = 16
serv = TCPServer(('', 20000), EchoHandler)
for n in range(NWORKERS):
t = Thread(target=serv.serve_forever)
t.daemon = True
t.start()
serv.serve_forever()
一般来讲,一个TCPServer在实例化的时候会绑定并激活相应的socket,如果 bind_and_activate 为真,则构造方法会自动调用server_bind() 和 server_activate()方法。其余参数传给基类 BaseServer,有时候想通过设置某些选项去调整底下的socket,可以设置参数bind_and_activate=False。
if __name__ == '__main__':
serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)
# Set up various socket options
serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# Bind and activate
serv.server_bind()
serv.server_activate()
serv.serve_forever()
socket.SO_REUSEADDR允许服务器重新绑定一个之前使用过的端口号。由于要被经常使用到,它被放置到类变量中,可以直接在 TCPServer上面设置
当然,也可以不使用一个工具类,直接使用原生的Socket的编写TCP服务端
from socket import socket, AF_INET, SOCK_STREAM
def echo_handler(address, client_sock):
print('Got connection from {}'.format(address))
while True:
msg = client_sock.recv(8192)
if not msg:
break
client_sock.sendall(msg)
client_sock.close()
def echo_server(address, backlog=5):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(address)
sock.listen(backlog)
while True:
client_sock, client_addr = sock.accept()
echo_handler(client_addr, client_sock)
if __name__ == '__main__':
echo_server(('', 20000))
了解了TCP的服务通信,在来看一下UDP的
创建UDP服务
实现一个基于 UDP 协议的服务器来与客户端通信。
跟 TCP 一样,UDP 服务器也可以通过使用socketserver库很容易的被创建。例如,下面是一个简单的时间服务器:
from socketserver import BaseRequestHandler, UDPServer
import time
class TimeHandler(BaseRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
# Get message and client socket request 属性是一个包含了数据报和底层 socket 对象的元组
msg, sock = self.request
resp = time.ctime()
sock.sendto(resp.encode('ascii'), self.client_address)
if __name__ == '__main__':
serv = UDPServer(('', 20000), TimeHandler)
serv.serve_forever()
测试一下
>>> from socket import socket, AF_INET, SOCK_DGRAM
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'', ('localhost', 20000))
0
>>> s.recvfrom(8192)
(b'Tue May 3 11:48:53 2022', ('127.0.0.1', 20000))
>>>
对于UPD协议而言,对于数据报的传送,应该使用 socket 的sendto() 和 recvfrom() 方法,因为是面向无连接的,没有建立连接的步骤,但是要在发生时跟着接受方
了解基本的通信协议之后,回到今天要讲的ERST接口。REST接口是基于HTTP协议的,而HTTP是直接依赖TCP的协议栈,负责约束表示层
创建一个简单的REST接口
使用一个简单的 REST 接口通过网络远程控制或访问的应用程序,但是又不想自己去安装一个完整的 web 框架。
可以构建一个 REST 风格的接口,最简单的方法是创建一个基于 WSGI 标准(Web服务网关接口,PEP 3333)的很小的库。类似支持REST风格的Python Web框架 Flask。
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : app.py
@Time : 2022/05/03 14:43:56
@Author : Li Ruilong
@Version : 1.0
@Contact : 1224965096@qq.com
@Desc : None
"""
# here put the import lib
import time
import cgi
def notfound_404(environ, start_response):
start_response('404 Not Found', [('Content-type', 'text/plain')])
return [b'Not Found']
# 核心控制器,用于路由注册
class PathDispatcher:
def __init__(self):
# 映射字典
self.pathmap = {}
# 核心控制器的回调
def __call__(self, environ, start_response):
# 获取路由
path = environ['PATH_INFO']
# 获取请求参数
params = cgi.FieldStorage(environ['wsgi.input'],
environ=environ)
# 获取请求方法
method = environ['REQUEST_METHOD'].lower()
environ['params'] = {key: params.getvalue(key) for key in params}
# 找到映射的函数
handler = self.pathmap.get((method, path), notfound_404)
# 返回函数
return handler(environ, start_response)
def register(self, method, path, function):
# 请求方法和路由作为K,执行函数为V
self.pathmap[method.lower(), path] = function
return function
_hello_resp = "wo jiao {name}"
def hello_world(environ, start_response):
start_response('200 OK', [('Content-type', 'text/html')])
params = environ['params']
resp = _hello_resp.format(name=params.get('name'))
yield resp.encode('utf-8')
_localtime_resp = "dang qian shjian {t}"
# 路由的回调
def localtime(environ, start_response):
start_response('200 OK', [('Content-type', 'application/xml')])
resp = _localtime_resp.format(t=time.localtime())
yield resp.encode('utf-8')
if __name__ == '__main__':
from wsgiref.simple_server import make_server
# 创建一个核心控制器,用于路由注册
dispatcher = PathDispatcher()
# 注册路由,对应的回调方法
dispatcher.register('GET', '/hello', hello_world)
dispatcher.register('GET', '/localtime', localtime)
# Launch a basic server 监听8080端口,注入核心控制器
httpd = make_server('', 8080, dispatcher)
print('Serving on port 8080...')
httpd.serve_forever()
测试一下
┌──[root@liruilongs.github.io]-[~]
└─$coproc (./app.py)
[2] 130447
curl localhost:8080/hello
┌──[root@liruilongs.github.io]-[~]
└─$curl localhost:8080/hello
127.0.0.1 - - [03/May/2022 16:09:12] "GET /hello HTTP/1.1" 200 12
wo jiao None
curl localhost:8080/hello?name=liruilong
┌──[root@liruilongs.github.io]-[~]
└─$curl localhost:8080/hello?name=liruilong
127.0.0.1 - - [03/May/2022 16:09:47] "GET /hello?name=liruilong HTTP/1.1" 200 17
wo jiao liruilong
┌──[root@liruilongs.github.io]-[~]
└─$jobs
....
[2]- 运行中 coproc COPROC ( ./app.py ) &
实现一个简单的 REST 接口,只需让的程序代码满足 Python 的 WSGI标准即可。WSGI 被标准库支持,同时也被绝大部分第三方 web 框架支持。
这里感觉Python Web的WSGI标准和Java Web 体系的Servlet规范特别接近,但是Servlet是侵入式的,同时需要特定的Web容器(Tomcat)支持,而WSGI好像对代码的影响很少...感兴趣小伙伴可以研究下.
另一方面,通过上面的代码,可以对当下这种Web端MVC的设计模式流程(Flask,Django,SpringMVC)有一个基本的认识,当然实际的框架要复杂的多。但是基本构建思路一样。
关于WSGI标准,简单来分析一下
以一个可调用对象形式来实现路由匹配要操作的方法
import cgi
def wsgi_app(environ, start_response):
pass
environ 属性是一个字典,包含了从 web 服务器如 Apache[参考 Internet RFC 3875]提供的 CGI 接口中获取的值。要将这些不同的值提取出来,可以像这么这样写:
def wsgi_app(environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
# Parse the query parameters
params = cgi.FieldStorage(environ['wsgi.input'], environ=environ)
start_response 参数是一个为了初始化一个请求对象而必须被调用的函数。第一个参数是返回的 HTTP 状态值,第二个参数是一个 (名, 值) 元组列表,用来构建返回的 HTTP 头。
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
为了返回数据,一个 WSGI 程序必须返回一个字节字符串序列。可以像下面这样使用一个列表来完成
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
resp = []
resp.append(b'Hello World\n')
resp.append(b'Goodbye!\n')
return resp
或者,还可以使用 yield
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
yield b'Hello World\n'
yield b'Goodbye!\n'
最后返回的必须是字节字符串。如果返回结果包含文本字符串,必须先将其编码成字节。图片也是OK的
class WSGIApplication:
def __init__(self):
...
def __call__(self, environ, start_response)
...
PathDispatcher 类。这个分发器仅仅只是管理一个字典,将 (方法, 路径) 对映射到处理器函数上面。当一个请求到来时,它的方法和路径被提取出来,然后被分发到对应的处理器上面去。
dispatcher = PathDispatcher()
# 注册路由,对应的回调
dispatcher.register('GET', '/hello', hello_world)
dispatcher.register('GET', '/localtime', localtime)
任何查询变量会被解析后放到一个字典中,以 environ['params'] 形式存储。后面这个步骤太常见,所以建议在分发器里面完成,这样可以省掉很多重复代码。使用分发器的时候,只需简单的创建一个实例,然后通过它注册各种 WSGI 形式的函数。编写这些函数应该超级简单了,只要遵循 start_response() 函数的编写规则,并且最后返回字节字符串即可。
WSGI 本身是一个很小的标准。因此它并没有提供一些高级的特性比如认证、cookies、重定向、全局的异常处理等。这些自己实现起来也不难。不过如果想要更多的支持,可以考虑第三方库
上面服务端的构建,我们使用了curl工具来访问,那么作为客户端Python有哪些交互方式?
作为客户端与HTTP服务交互
需要通过 HTTP 协议以客户端的方式访问多种服务。例如,下载数据或者与基于 REST 的 API 进行交互。
对于简单的事情来说,通常使用urllib.request模块就够了.一个Get请求的Demo
┌──[root@liruilongs.github.io]-[~]
└─$python3
Python 3.6.8 (default, Nov 16 2020, 16:55:22)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from urllib import request, parse
>>> url = 'http://httpbin.org/get'
>>> parms = {
... 'name1': 'value1',
... 'name2': 'value2'
... }
>>> querystring = parse.urlencode(parms)
>>> querystring
'name1=value1&name2=value2'
>>> request.urlopen(url+'?' + querystring)
<http.client.HTTPResponse object at 0x7ffa0ef0f710>
>>> u = request.urlopen(url+'?' + querystring)
>>> u.read()
b'{
"args": {
"name1": "value1",
"name2": "value2"
},
"headers": {
"Accept-Encoding": "identity",
"Host": "httpbin.org",
"User-Agent": "Python-urllib/3.6",
"X-Amzn-Trace-Id": "Root=1-62707b15-41a1169c0897c9001a07f948"
},
"origin": "39.154.13.139",
"url": "http://httpbin.org/get?name1=value1&name2=value2"
}'
如果需要使用POST方法在请求主体中发送查询参数,可以将参数编码后作为可选参数提供给urlopen()函数,就像这样:
>>> from urllib import request, parse
>>> url = 'http://httpbin.org/post'
>>> parms = {
... 'name1' : 'value1',
... 'name2' : 'value2'
... }
>>> querystring = parse.urlencode(parms)
>>> querystring.encode('ascii')
b'name1=value1&name2=value2'
>>> u = request.urlopen(url, querystring.encode('ascii'))
>>> resp = u.read()
>>> resp
b'{
"args": {},
"data": "",
"files": {},
"form": {
"name1": "value1",
"name2": "value2"
},
"headers": {
"Accept-Encoding": "identity",
"Content-Length": "25",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "httpbin.org",
"User-Agent": "Python-urllib/3.6",
"X-Amzn-Trace-Id": "Root=1-62707d24-15e9944760d3bbaa36c3714a"
},
"json": null,
"origin": "39.154.13.139",
"url": "http://httpbin.org/post"
}'
>>>
在发出的请求中提供一些自定义的 HTTP 请求首部,创建一个 Request 实例然后将其传给urlopen()
>>> from urllib import request, parse
>>> headers = {
... 'User-agent' : 'none/ofyourbusiness',
... 'Spam' : 'Eggs'
... }
>>> req = request.Request(url, querystring.encode('ascii'), headers=headers)
>>> u = request.urlopen(req)
>>> u.read()
b'{
"args": {},
"data": "",
"files": {},
"form": {
"name1": "value1",
"name2": "value2"
},
"headers": {
"Accept-Encoding": "identity",
"Content-Length": "25",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "httpbin.org",
"Spam": "Eggs",
"User-Agent": "none/ofyourbusiness",
"X-Amzn-Trace-Id": "Root=1-62707f0e-308a8137555e15797d950018"
},
"json": null,
"origin": "39.154.13.139",
"url": "http://httpbin.org/post"
}'
>>>
如果需要交互的服务,可以使用 requests 模块, 这个不是自带模块,需要安装python3 -m pip install requests
>>> import requests
>>> url = 'http://httpbin.org/post'
>>> parms = {
... 'name1' : 'value1',
... 'name2' : 'value2'
... }
>>> headers = {
... 'User-agent' : 'none/ofyourbusiness',
... 'Spam' : 'Eggs'
... }
>>> resp = requests.post(url, data=parms, headers=headers)
>>> resp.text
'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "name1": "value1", \n "name2": "value2"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "25", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "httpbin.org", \n "Spam": "Eggs", \n "User-Agent": "none/ofyourbusiness", \n "X-Amzn-Trace-Id": "Root=1-62708080-7a14319e699baa2e35a352fb"\n }, \n "json": null, \n "origin": "39.154.13.139", \n "url": "http://httpbin.org/post"\n}\n'
>>> resp.json()
{'args': {}, 'data': '', 'files': {}, 'form': {'name1': 'value1', 'name2': 'value2'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '25', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'httpbin.org', 'Spam': 'Eggs', 'User-Agent': 'none/ofyourbusiness', 'X-Amzn-Trace-Id': 'Root=1-62708080-7a14319e699baa2e35a352fb'}, 'json': None, 'origin': '39.154.13.139', 'url': 'http://httpbin.org/post'}
>>>
requests 模块支持很多种数据的返会方式,可以直接返回以 Unicode 解码的响应文本,也可以返回JSON数据
利用 requests 库发起一个 HEAD 请求
>>> import requests
>>> resp = requests.head( 'http://httpbin.org/post')
>>> resp
<Response [405]>
>>> resp = requests.head( 'http://httpbin.org/')
>>> resp
<Response [200]>
>>> resp.status_code
200
>>> resp.headers['content-length']
'9593'
>>> resp.headers['content-type']
'text/html; charset=utf-8'
>>> resp.text
''
>>>
如果决定坚持使用标准的程序库而不考虑像requests这样的第三方库,可以使用底层的 http.client 模块来实现自己的代码。
from http.client import HTTPConnection
from urllib import parse
c = HTTPConnection('www.python.org', 80)
c.request('HEAD', '/index.html')
resp = c.getresponse()
print('Status', resp.status)
for name, value in resp.getheaders():
print(name, value)
测试 HTTP 客户端,考虑使用httpbin服务(http://httpbin.org)。这个站点会接收发出的请求,然后以JSON 的形式将相应信息回传回来。
>>> import requests
>>> r = requests.get('http://httpbin.org/get?name=Dave&n=37',
... headers = { 'User-agent': 'goaway/1.0' })
>>> resp = r.json()
>>> resp['headers']
{'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'goaway/1.0', 'X-Amzn-Trace-Id': 'Root=1-62708c06-7c7d8cc4441479c65faea5b4'}
>>>
通过XML-RPC实现简单的远程调用
RPC,通俗的讲,想找到一个方式去运行在远程机器上面的 Python 程序中的函数或方法。
实现一个远程方法调用的最简单方式是使用 XML-RPC。下面实现了键 值存储功能的简单RPC服务器:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : app.py
@Time : 2022/05/03 17:07:02
@Author : Li Ruilong
@Version : 1.0
@Contact : 1224965096@qq.com
@Desc : None
"""
# here put the import lib
from xmlrpc.server import SimpleXMLRPCServer
class KeyValueServer:
_rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
def __init__(self, address):
self._data = {}
self._serv = SimpleXMLRPCServer(address, allow_none=True)
# 注册方法
for name in self._rpc_methods_:
self._serv.register_function(getattr(self, name))
def get(self, name):
return self._data[name]
def set(self, name, value):
self._data[name] = value
def delete(self, name):
del self._data[name]
def exists(self, name):
return name in self._data
def keys(self):
return list(self._data)
def serve_forever(self):
self._serv.serve_forever()
# Example
if __name__ == '__main__':
kvserv = KeyValueServer(('', 15001))
kvserv.serve_forever()
RPC客户端测试
PS E:\docker> python
Python 3.9.0 (tags/v3.9.0:9cf6752, Oct 5 2020, 15:23:07) [MSC v.1927 32 bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from xmlrpc.client import ServerProxy
>>> s = ServerProxy('http://localhost:15001', allow_none=True)
>>> s.set('foo','bar')
>>> s.set('spam', [1, 2, 3])
>>> s.keys()
['foo', 'spam']
>>> s.get('foo')
'bar'
>>> s.get('spam')
[1, 2, 3]
>>> s.delete('spam')
>>> s.exists('spam')
False
>>>
XML-RPC 可以让很容易的构造一个简单的远程调用服务。所需要做的仅仅是创建一个服务器实例,通过它的方法register_function()来注册函数,然后使用方法serve_forever()启动它。在上面将这些步骤放在一起写到一个类中
这并不是必须的。还可以像下面这样创建一个服务器:
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.server import SimpleXMLRPCServer
def add(x,y):
return x+y
serv = SimpleXMLRPCServer(('', 15000))
serv.register_function(add)
serv.serve_forever()
XML-RPC 暴露出来的函数只能适用于部分数据类型,比如字符串、整形、列表和字典,不应该将 XML-RPC 服务以公共 API 的方式暴露出来。
XML-RPC 的一个缺点是它的性能。SimpleXMLRPCServer 的实现是单线程的,所以它不适合于大型程序
由于 XML-RPC 将所有数据都序列化为 XML 格式,所以它会比其他的方式运行的慢一些。但是它也有优点,这种方式的编码可以被绝大部分其他编程语言支持。通过使用这种方式,其他语言的客户端程序都能访问的服务。
通过 multiprocessing 实现RPC调用
在一个消息传输层如 sockets、multiprocessing.connections或zeroMQ的基础之上实现一个简单的远程过程调用(RPC)
将函数请求、参数和返回值使用pickle编码后,在不同的解释器直接传送pickle字节字符串,可以很容易的实现RPC。下面是一个简单的PRC处理器,可以被整合到一个服务器中去:
RPC 服务端
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : rpcserver.py
@Time : 2022/07/08 20:16:21
@Author : Li Ruilong
@Version : 1.0
@Contact : 1224965096@qq.com
@Desc : 远程调用服务
"""
# here put the import lib
import pickle
from multiprocessing.connection import Listener
from threading import Thread
"""
@Time : 2022/07/08 20:28:02
@Author : Li Ruilong
@Version : 1.0
@Desc : None
Args:
远程调用处理器
Returns:
void
"""
class RPCHandler:
def __init__(self):
self._functions = {}
"""
@Time : 2022/07/08 20:16:47
@Author : Li Ruilong
@Version : 1.0
@Desc : 函数注册
Args:
func
Returns:
void
"""
def register_function(self, func):
self._functions[func.__name__] = func
"""
@Time : 2022/07/08 20:17:51
@Author : Li Ruilong
@Version : 1.0
@Desc : 调用函数
Args:
connection
Returns:
void
"""
def handle_connection(self, connection):
try:
while True:
func_name, args, kwargs = pickle.loads(connection.recv())
try:
print("调用函数:",(func_name, args, kwargs))
r = self._functions[func_name](*args,**kwargs)
print("返回结果:",r)
connection.send(pickle.dumps(r))
except Exception as e:
connection.send(pickle.dumps(e))
except Exception as e:
pass
def rpc_server(handler, address, authkey):
sock = Listener(address, authkey=authkey)
while True:
client = sock.accept()
t = Thread(target=handler.handle_connection, args=(client,))
t.daemon = True
print("函数开始执行")
t.start()
def add(x, y):
return x + y
def sub(x, y):
return x - y
if __name__ == '__main__':
print(format("开始加载RPC处理器",'》<20'))
handler = RPCHandler()
print(format("处理器加载完成,注册函数",'》<20'))
handler.register_function(add)
handler.register_function(sub)
print(format("函数注册成功,服务启动",'》<20'))
rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')
RPC 客户端
import pickle
from multiprocessing.connection import Client
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
print("开始调用函数",name)
def do_rpc(*args, **kwargs):
self._connection.send(pickle.dumps((name, args, kwargs)))
result = pickle.loads(self._connection.recv())
print("返回结果",result)
if isinstance(result, Exception):
raise result
return result
return do_rpc
c = Client(('localhost', 17000), authkey=b'peekaboo')
print(format("建立连接,创建RPC代理",'》<30'),c)
proxy = RPCProxy(c)
print(format("创建代理成功",'》<30'))
print("add(2, 3) = ",proxy.add(2, 3) )
print("sub(2, 3) = ", proxy.sub(2, 3))
D:\python\Python310\python.exe D:/python/code/rabbit_mq_demo/rpcserver.py
开始加载RPC处理器》》》》》》》》》》
处理器加载完成,注册函数》》》》》》》》
函数注册成功,服务启动》》》》》》》》》
函数开始执行
调用函数: ('add', (2, 3), {})
返回结果: 5
调用函数: ('sub', (2, 3), {})
返回结果: -1
==============
D:\python\Python310\python.exe D:/python/code/rabbit_mq_demo/RPC.py
建立连接,创建RPC代理》》》》》》》》》》》》》》》》》》 <multiprocessing.connection.Connection object at 0x00DFACA0>
创建代理成功》》》》》》》》》》》》》》》》》》》》》》》》
开始调用函数 add
返回结果 5
add(2, 3) = 5
开始调用函数 sub
返回结果 -1
sub(2, 3) = -1
Process finished with exit code 0
RPCHandler和RPCProxy的基本思路是很比较简单的。
如果一个客户端想要调用一个远程函数,比如foo(1,2,z=3),代理类创建一个包含了函数名和参数的元组(foo',(1,2),{'z':3})。这个元组被 pickle 序列化后通过网络连接发生出去。
由于底层需要依赖 pickle,那么安全问题就需要考虑了(因为一个聪明的黑客可以创建特定的消息,能够让任意函数通过 pickle反序列化后被执行)。
因此永远不要允许来自不信任或未认证的客户端的RPC。特别是绝对不要允许来自Internet的任意机器的访问,这种只能在内部被使用,位于防火墙后面并且不要对外暴露。
作为pickle的替代,也许可以考虑使用JSON、XML或一些其他的编码格式来序列化消息。
例如,本机实例可以很容易的改写成JSON编码方案。还需要将pickle.1oads()和pickle.dumps()替换成json.1oads()和json.dumps()即可:
# here put the import lib
import json
........
def handle_connection(self, connection):
try:
while True:
# 反序列化
func_name, args, kwargs = json.loads(connection.recv())
try:
print("调用函数:",(func_name, args, kwargs))
r = self._functions[func_name](*args,**kwargs)
print("返回结果:",r)
# 序列化发送
connection.send(json.dumps(r))
except Exception as e:
connection.send(json.dumps(e))
except Exception as e:
pass
......
import json
from multiprocessing.connection import Client
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
print("开始调用函数",name)
def do_rpc(*args, **kwargs):
print("JSON 序列化后的值",json.dumps((name, args, kwargs)))
self._connection.send(json.dumps((name, args, kwargs)))
result = json.loads(self._connection.recv())
print("返回结果",result)
if isinstance(result, Exception):
raise result
return result
return do_rpc
c = Client(('localhost', 17000), authkey=b'peekaboo')
print(format("建立连接,创建RPC代理",'》<30'),c)
proxy = RPCProxy(c)
print(format("创建代理成功",'》<30'))
print("add(2, 3) = ",proxy.add(2, 3) )
print("sub(2, 3) = ", proxy.sub(2, 3))
可以看到序列化后的结果
D:\python\Python310\python.exe D:/python/code/rabbit_mq_demo/RPC.py
建立连接,创建RPC代理》》》》》》》》》》》》》》》》》》 <multiprocessing.connection.Connection object at 0x0078AD30>
创建代理成功》》》》》》》》》》》》》》》》》》》》》》》》
开始调用函数 add
JSON 序列化后的值 ["add", [2, 3], {}]
返回结果 5
add(2, 3) = 5
开始调用函数 sub
JSON 序列化后的值 ["sub", [2, 3], {}]
返回结果 -1
sub(2, 3) = -1
实现RPC的一个比较复杂的问题是如何去处理异常。至少,当方法产生异常时服务器不应该奔溃。因此,返回给客户端的异常所代表的含义就要好好设计了。
如果使用pickle,异常对象实例在客户端能被反序列化并抛出。如果使用其他的协议,那得想想另外的方法了。不过至少,应该在响应中返回异常字符串。在JSON的例子中就是使用的这种方式。
博文内容参考
- 《Python Cookbook》
- What is WSGI?:https://wsgi.readthedocs.io/en/latest/what.html
- PEP 3333 - Python Web 服务器网关接口 v1.0.1:https://peps.python.org/pep-3333/
相关推荐
- 打开新世界,教你用RooCode+Copliot+Mcp打造一个自己的Manus
-
本文耗时两天打造,想要一遍走通需要花点时间,建议找个专注的时间开搞!这不仅是个免费使用claude3.5的方案,也是一个超级智能体方案,绝对值得一试!最近Manus真是赚足了眼球,然而我还是没有邀请码...
- Git仓库(git仓库有哪些)
-
#Git仓库使用方法流程详解##一、环境搭建与基础配置###1.1安装与初始化-**安装Git**:官网下载安装包,默认配置安装-**配置全局信息**:```bashgitconfig...
- idea版的cursor:Windsurf Wave 7(ideawalk)
-
在企业环境中,VisualStudioCode和JetBrains系列是最常用的开发工具,覆盖了全球绝大多数开发者。这两类IDE各有优势,但JetBrains系列凭借其针对特定语言和企业场景的深度...
- Ai 编辑器 Cursor 零基础教程:推箱子小游戏实战演练
-
最近Ai火的同时,Ai编辑器Cursor同样火了一把。今天我们就白漂一下Cursor,使用免费版本搞一个零基础教程,并实战演练一个“网页版的推箱子小游戏”。通过这篇文章,让你真正了解cursor是什么...
- ChatGPT深度集成于苹果Mac软件 编码能力得到提升
-
【CNMO科技消息】近日,OpenAI发布了针对MacOS的桌面应用程序,并宣布了一系列与各类应用程序的互操作性功能,标志着ChatGPT正在从聊天机器人向AI智能体工具进化。此次发布的MacOS桌面...
- 日常开发中常用的git操作命令和使用技巧
-
日常开发中常用的git操作命令,从配置、初始化本地仓库到提交代码的常用git操作命令使用git前的配置刚使用git,先要在电脑上安装好git,接着我们需要配置一下帐户信息:用户名和邮箱。#设置用户名...
- Trae IDE 如何与 GitHub 无缝对接?
-
TraeIDE内置了GitHub集成功能,让开发者可以直接在IDE里管理代码仓库和版本控制。1.直接从GitHub克隆项目如果你想把GitHub上的代码拉到本地,Trae提供了...
- China's diplomacy to further provide strong support for country's modernization: FM
-
BEIJING,March7(Xinhua)--ChineseForeignMinisterWangYisaidFridaythatChina'sdiplomacywil...
- 三十分钟入门基础Go(Java小子版)(java入门级教程)
-
前言Go语言定义Go(又称Golang)是Google的RobertGriesemer,RobPike及KenThompson开发的一种静态、强类型、编译型语言。Go语言语法与...
- China will definitely take countermeasures in response to arbitrary pressure: FM
-
BEIJING,March7(Xinhua)--Chinawilldefinitelytakecountermeasuresinresponsetoarbitrarypre...
- Go操作etcd(go操作docker实现沙箱)
-
Go语言操作etcd,这里推荐官方包etcd/clientv3。文档:https://pkg.go.dev/go.etcd.io/etcd/clientv3etcdv3使用gRPC进行远程过程调...
- 腾讯 Go 性能优化实战(腾讯游戏优化软件)
-
作者:trumanyan,腾讯CSIG后台开发工程师项目背景网关服务作为统一接入服务,是大部分服务的统一入口。为了避免成功瓶颈,需要对其进行尽可能地优化。因此,特别总结一下golang后台服务...
- golang 之JWT实现(golang gin jwt)
-
什么是JSONWebToken?JSONWebToken(JWT)是一个开放标准(RFC7519),它定义了一种紧凑且自包含的方式,用于在各方之间以JSON方式安全地传输信息。由于此信息是经...
- 一文看懂 session 和 cookie(session cookie的区别)
-
-----------cookie大家应该都熟悉,比如说登录某些网站一段时间后,就要求你重新登录;再比如有的同学很喜欢玩爬虫技术,有时候网站就是可以拦截住你的爬虫,这些都和cookie有关。如果...
- 有望取代 java?GO 语言项目了解一下
-
GO语言在编程界一直让人又爱又恨,有人说“GO将统治下一个十年”,“几乎所有新的、有趣的东西都是用Go写的”;也有人说它过于死板,使用感太差。国外有Google、AWS、Cloudflar...
- 一周热门
-
-
Python实现人事自动打卡,再也不会被批评
-
Psutil + Flask + Pyecharts + Bootstrap 开发动态可视化系统监控
-
一个解决支持HTML/CSS/JS网页转PDF(高质量)的终极解决方案
-
再见Swagger UI 国人开源了一款超好用的 API 文档生成框架,真香
-
【验证码逆向专栏】vaptcha 手势验证码逆向分析
-
网页转成pdf文件的经验分享 网页转成pdf文件的经验分享怎么弄
-
C++ std::vector 简介
-
python使用fitz模块提取pdf中的图片
-
《人人译客》如何规划你的移动电商网站(2)
-
Jupyterhub安装教程 jupyter怎么安装包
-
- 最近发表
-
- 打开新世界,教你用RooCode+Copliot+Mcp打造一个自己的Manus
- Git仓库(git仓库有哪些)
- idea版的cursor:Windsurf Wave 7(ideawalk)
- Ai 编辑器 Cursor 零基础教程:推箱子小游戏实战演练
- ChatGPT深度集成于苹果Mac软件 编码能力得到提升
- 日常开发中常用的git操作命令和使用技巧
- Trae IDE 如何与 GitHub 无缝对接?
- China's diplomacy to further provide strong support for country's modernization: FM
- 三十分钟入门基础Go(Java小子版)(java入门级教程)
- China will definitely take countermeasures in response to arbitrary pressure: FM
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- table.render (33)
- uniapp textarea (33)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- react-admin (33)
- vscode切换git分支 (35)
- vscode美化代码 (33)
- python bytes转16进制 (35)