包含关系 :网络协议是通讯协议在计算机网络领域的
载体关系: 计算机网络是网络协议的物理载体
网络协议 是计算机网络的规范定义的沟通规则
类比:广义沟通规则 含交通法规+手语+摩尔斯电码等所有规则
定义:设备之间进行对话时共同遵守的一套规则和约定(不限介质、场景)
核心:语法(数据格式)、语义(含义解析)、时序(响应顺序)
特性:跨领域通用性 如:网络协议:TCP/IP/硬件协议:PCIe, HDMI/物联网协议:LoRaWAN
类比:公路系统 物理基础设施(道路+桥梁)
定义:通过通信设备和线路连接的自治计算机集合体
核心:实现资源共享(数据/硬件)和信息传递
类型:局域网, 广域网, 网际网
类比:交通法规 确保车辆有序通行的规则(红绿灯/限速)
定义:计算机网络通信
核心: 解决寻址、路由、可靠传输、应用交互等网络特有问题,必须运行在计算机网络中
层级模型:TCP/IP四层模型、OSI七层模型
分层 | 功能 | 协议/设备 | 单位 |
---|---|---|---|
应用层 |
| HTTP(网页) FTP(文件传输) SMTP(邮件) DNS(域名解析) | |
表示层 |
| SSL/TLS(加密) JPEG/MPEG(压缩)ASCII/Unicode(编码) | |
会话层 |
| NetBIOS RPC SSH会话管理 | |
传输层 |
| TCP(可靠连接) UDP(无连接高速传输) | |
网络层 |
| IP(IPv4/IPv6) ICMP OSPF BGP 路由器(Router) | 数据包 |
数据链路层 |
| Ethernet(MAC子层) VLAN 网桥(Bridge) 交换机(Switch) | 帧 |
物理层 | 义物理介质特性 | 以太网 光纤 集线器 中继器 | 比特 |
分层 | 功能 | 协议/设备 | 应用 |
---|---|---|---|
应用层 |
| HTTP(网页) HTTPS(加密网页) FTP(文件传输) SMTP(邮件) DNS(域名解析) |
|
传输层 |
| TCP(可靠连接) • 三次握手建立连接 • 确认重传机制 UDP(无连接高速) • 低延迟、无保障 • 适用于实时应用 |
|
网络层 |
|
| 路由器(Router) • 根据IP地址路由数据包 • 连接不同网络 三层交换机 |
物理层 |
|
HDLC(广域网封装) | 交换机(Switch) • 基于MAC地址转发帧 网卡(NIC) • 物理信号转换 网桥(Bridge) 调制解调器(Modem) |
用户输入 http://
www.uufo.top
→ 应用层生成 HTTP GET 请求报文。
传输层(TCP) → 添加 TCP 头(源端口 54321,目标端口 80)。
网络层(IP) → 添加 IP 头(源 IP 192.168.1.100,目标 IP 93.184.216.34)。
网络接口层(以太网) → 添加帧头(源 MAC 00:11:22:AA:BB:CC,目标 MAC 路由器 MAC)。
物理传输 → 比特流经交换机、路由器抵达目标服务器。
服务器解封装 → 逆向剥离头部,将 HTTP 请求交给 Web 服务程序处理。
tcp三次握手-lts加密-tcp连接结束
客户端 → 服务器(ClientHello)
发送随机数
支持的 TLS 版本
密码套件列表
扩展(如 SNI、支持的椭圆曲线)
服务器 → 客户端(ServerHello)
ServerHello 确认 TLS 版本、密码套件、服务器随机数
Certificate 发送服务器证书链(身份认证)
ServerKeyExchange 提供 ECDHE 参数(临时公钥 + 签名)
ServerHelloDone 通知客户端服务器消息发送完毕
客户端 → 服务器 ClientKeyExchange
发送客户端的 ECDHE 临时公钥
客户端 → 服务器 ChangeCipherSpec
通知:后续消息将使用协商的密钥加密
客户端 → 服务器 Finished
第一条加密消息,验证握手完整性(含 verify_data
)
服务器 → 客户端 ChangeCipherSpec
通知:后续消息将加密
服务器 → 客户端 Finished
加密的 verify_data
,双方确认密钥一致
每个步骤并不严格对应一个独立的网络请求。TLS 协议允许将多个握手消息合并到同一个 TCP 报文段(Packet) 中发送
最少 TCP 请求数
完整 TLS 1.2 握手通常需要 4-5 个 TCP 请求,并非全部
客户端 → 服务器:3 个请求(ClientHello
、ClientKeyExchange
、ChangeCipherSpec + Finished
)
服务器 → 客户端:2 个请求(合并消息、ChangeCipherSpec + Finished
)
步骤2的a-d就是合并为1个tcp请求
# 配置参数 host = '0.0.0.0' # 监听所有网络接口 http_port = 8080 # HTTP服务端口 https_port = 8443 # HTTPS服务端口 cert_dir = 'socket_tls_web/cert/' # SSL证书存放目录 cert_path = os.path.join(cert_dir, 'server.crt') # 证书文件路径 key_path = os.path.join(cert_dir, 'server.key') # 私钥文件路径 print("正在初始化Web服务器...") static_dir = 'socket_tls_web/cert/' # 静态文件目录
import os from generate_cert import SSLCertificateGenerator class StaticFileServer(object): def __init__(self): self.dir = 'socket_tls/static' def set_path(self,dir): self.dir = dir def ensure_static_dir_exists(self): """确保静态文件目录存在,如果不存在则创建,并生成默认的index.html文件 参数: cert_dir: 证书目录 cert_path: 证书文件路径 key_path: 私钥文件路径 返回: bool: 证书设置是否成功 """ if not os.path.exists(self.dir): try: os.makedirs(self.dir) print(f"创建静态文件目录: {self.dir}") # 创建默认的index.html index_path = os.path.join(self.dir, 'index.html') if not os.path.exists(index_path): with open(index_path, 'w', encoding='utf-8') as f: f.write(""" <!DOCTYPE html> <html> <head> <title>Socket TLS Web Server</title> <style> body { font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; } h1 { color: #333; } </style> </head> <body> <h1>Welcome to Socket TLS Web Server</h1> <p>This is a simple web server implemented using Python sockets with TLS support.</p> <p>Available endpoints:</p> <ul> <li><a href="/">/</a> - This page</li> <li><a href="/hello">/hello</a> - A simple greeting</li> <li><a href="/time">/time</a> - Current server time</li> </ul> </body> </html> """) print("创建默认index.html文件") except Exception as e: print(f"创建静态文件目录失败: {e}") raise SystemExit(1) def setup_ssl_certificates(self,cert_path: str, key_path: str) -> bool: """ 设置SSL证书 检查证书是否存在,如不存在则生成自签名证书 参数: cert_dir: 证书目录 cert_path: 证书文件路径 key_path: 私钥文件路径 返回: bool: 证书设置是否成功 """ # 确保证书目录存在 if not os.path.exists(self.dir): try: os.makedirs(self.dir) print(f"创建证书目录: {self.dir}") except Exception as e: print(f"创建证书目录失败: {e}") return False # 检查证书和私钥是否存在 if not os.path.exists(cert_path) or not os.path.exists(key_path): print("证书或私钥不存在,正在生成自签名证书...") print(f"证书路径: {self.dir+cert_path}") try: #调用证书生成类 cert_generator = SSLCertificateGenerator() # 创建证书生成器实例 cert_generator.generate_self_signed_cert(self.dir+cert_path, self.dir+key_path) # 调用方法 print(f"已生成自签名证书: {self.dir+cert_path}") print(f"已生成私钥: {self.dir+key_path}") except Exception as e: print(f"生成证书失败: {e}") return False else: print("使用现有的SSL证书和私钥") return True
证书生成类
from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization from datetime import datetime, timedelta import os from typing import Tuple class SSLCertificateGenerator: def __init__(self, common_name: str = "localhost", country: str = "CN", state: str = "Beijing", locality: str = "Beijing", org: str = "Test Organization", org_unit: str = "IT Department", valid_days: int = 365): self.common_name = common_name self.country = country self.state = state self.locality = locality self.org = org self.org_unit = org_unit self.valid_days = valid_days self.private_key = None self.certificate = None def generate_rsa_key(self, key_size: int = 2048) -> rsa.RSAPrivateKey: """ 生成RSA密钥对 参数: key_size: RSA密钥大小(位),默认2048位 返回: rsa.RSAPrivateKey: 生成的RSA私钥对象 """ return rsa.generate_private_key( public_exponent=65537, # 常用的公开指数 key_size=key_size ) def create_certificate(self, private_key: rsa.RSAPrivateKey) -> x509.Certificate: """ 创建X509证书 参数: private_key: RSA私钥对象 返回: x509.Certificate: 生成的X509证书对象 """ # 创建证书主题 subject = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, self.common_name), x509.NameAttribute(NameOID.COUNTRY_NAME, self.country), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, self.state), x509.NameAttribute(NameOID.LOCALITY_NAME, self.locality), x509.NameAttribute(NameOID.ORGANIZATION_NAME, self.org), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, self.org_unit), ]) # 设置证书有效期 now = datetime.utcnow() cert_builder = x509.CertificateBuilder( issuer_name=subject, # 自签名证书的颁发者和主题相同 subject_name=subject, public_key=private_key.public_key(), serial_number=x509.random_serial_number(), not_valid_before=now, not_valid_after=now + timedelta(days=self.valid_days) ) # 添加基本约束扩展(表明这是CA证书) cert_builder = cert_builder.add_extension( x509.BasicConstraints(ca=True, path_length=None), critical=True ) # 添加密钥用途扩展 cert_builder = cert_builder.add_extension( x509.KeyUsage( digital_signature=True, key_encipherment=True, content_commitment=True, data_encipherment=False, key_agreement=False, key_cert_sign=True, crl_sign=True, encipher_only=False, decipher_only=False ), critical=True ) # 添加扩展密钥用途扩展 cert_builder = cert_builder.add_extension( x509.ExtendedKeyUsage([ x509.oid.ExtendedKeyUsageOID.SERVER_AUTH, x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH ]), critical=False ) # 添加主题备用名称扩展 cert_builder = cert_builder.add_extension( x509.SubjectAlternativeName([ x509.DNSName(self.common_name), x509.DNSName("localhost"), ]), critical=False ) # 使用SHA256算法签名证书 certificate = cert_builder.sign( private_key=private_key, algorithm=hashes.SHA256() ) return certificate def save_key_and_cert(self, cert_path: str, key_path: str) -> None: """ 保存私钥和证书到文件 参数: cert_path: 证书保存路径 key_path: 私钥保存路径 """ # 确保证书目录存在 cert_dir = os.path.dirname(cert_path) if cert_dir and not os.path.exists(cert_dir): os.makedirs(cert_dir) # 保存私钥(PEM格式) with open(key_path, "wb") as f: f.write(self.private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) # 保存证书(PEM格式) with open(cert_path, "wb") as f: f.write(self.certificate.public_bytes( encoding=serialization.Encoding.PEM )) def generate_self_signed_cert(self, cert_path: str, key_path: str) -> Tuple[str, str]: """ 生成自签名SSL证书和私钥 参数: cert_path: 证书保存路径 key_path: 私钥保存路径 返回: tuple: (证书路径, 私钥路径) """ try: # 生成RSA密钥对 self.private_key = self.generate_rsa_key() # 创建证书 self.certificate = self.create_certificate(self.private_key) # 保存证书和私钥 self.save_key_and_cert(cert_path, key_path) return cert_path, key_path except Exception as e: raise Exception(f"生成证书失败: {e}")
实现web服务
""" Socket TLS Web服务器核心实现 提供HTTP和HTTPS服务器功能,支持静态文件服务和基本路由处理 类: WebServer: 实现基本的HTTP/HTTPS Web服务器功能 """ import os import socket import ssl import threading import time from datetime import datetime from typing import Tuple, Optional, Dict, Union import mimetypes import urllib.parse class WebServer(object): """ Web服务器类,处理HTTP和HTTPS请求 属性: host (str): 服务器主机地址 http_port (int): HTTP服务端口号 https_port (int): HTTPS服务端口号 cert_path (str): SSL证书文件路径 key_path (str): SSL私钥文件路径 static_dir (str): 静态文件目录路径 http_server (socket.socket): HTTP服务器套接字 https_server (socket.socket): HTTPS服务器套接字 """ def __init__(self, host: str = '0.0.0.0', http_port: int = 8080, https_port: int = 8443, cert_path: str = 'socket_tls/cert/cert.crt', key_path: str = 'socket_tls/cert/key.key', static_dir: str = 'socket_tls/static/'): # 初始化Web服务器 self.host = host #服务器监听的主机地址,默认'0.0.0.0'表示所有网络接口 self.http_port = http_port #HTTP服务端口号,默认8080 self.https_port = https_port #HTTP服务端口号,默认8080 self.cert_path = cert_path #SSL证书文件路径,默认'cert/server.crt' self.key_path = key_path #SSL私钥文件路径,默认'cert/server.key' self.static_dir = static_dir #静态文件目录路径,默认'static' # 初始化服务器套接字 self.http_server = None #HTTP服务器套接字 self.https_server = None #HTTPS服务器套接字 def start(self): """ 启动Web服务器 创建并启动HTTP和HTTPS服务器线程 """ # 创建HTTP服务器 self.http_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 创建一个TCP套接字(SOCK_STREAM),用于HTTP服务,使用IPv4地址族(AF_INET) self.http_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #设置同样的套接字选项,允许地址重用 self.http_server.bind((self.host, self.http_port)) #绑定服务器套接字到指定地址和端口 self.http_server.listen(5) #开始监听传入连接,设置最大挂起连接数为5 # 创建HTTPS服务器 self.https_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #同样创建一个TCP套接字,用于HTTPS服务 self.https_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)#设置同样的套接字选项,允许地址重用 self.https_server.bind((self.host, self.https_port)) #将HTTPS套接字绑定到指定的主机地址和HTTPS端口上 self.https_server.listen(5) #开始监听HTTPS传入连接,最大挂起连接数同样为5 # #创建http和https的线程 #创建一个线程http_thread,用于运行_run_server方法,传入HTTP套接字和参数False(表示这是HTTP服务) http_thread = threading.Thread(target=self._run_server, args=(self.http_server, False)) #创建另一个线程https_thread,用于运行_run_server方法,传入HTTPS套接字和参数True https_thread = threading.Thread(target=self._run_server, args=(self.https_server, True)) #启动线程 http_thread.start() https_thread.start() print(f"HTTP服务器运行在 http://{self.host}:{self.http_port}") print(f"HTTPS服务器运行在 https://{self.host}:{self.https_port}") def _run_server(self, server: socket.socket, is_https: bool) -> None: print(f"启动 {server.getsockname()} 服务") #运行服务器主循环 #server: 服务器套接字对象 #s_https: 是否是HTTPS服务器 #循环,持续监听并接受客户端连接。 while True: try: #使用 server.accept() 阻塞等待客户端连接 #当有客户端连接时,返回一个新的客户端套接字对象 client_socket 和客户端地址 client_address client_socket, client_address = server.accept() #判断请求协议是否是HTTPS if is_https: # 配置SSL上下文 print("https加载证书") context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) #创建一个 ssl.SSLContext 对象,指定使用 PROTOCOL_TLS_SERVER 协议 context.load_cert_chain(self.cert_path, self.key_path) #加载证书和私钥文件(路径由 self.cert_path 和 self.key_path 指定) print("https加载证书成功") client_socket = context.wrap_socket(client_socket, server_side=True) #使用 context.wrap_socket() 方法将普通的 client_socket 包装成一个支持SSL/TLS 的安全套接字 # 为每个客户端创建新线程 #client_socket:客户端套接字; #client_address:客户端地址; #is_https:是否是 HTTPS 连接; client_thread = threading.Thread( target=self._handle_client, args=(client_socket, client_address, is_https) ) #调用 start() 方法启动线程 client_thread.start() except Exception as e: print(f"处理连接时出错: {e}") def _handle_client(self, client_socket: Union[socket.socket, ssl.SSLSocket], client_address: Tuple[str, int], is_https: bool) -> None: #处理客户端连接 #client_socket: 客户端套接字 #client_address: 客户端地址元组 (IP, 端口) #is_https: 是否是HTTPS连接 try: # 接收请求数据 request_data = client_socket.recv(1024) if not request_data: return print(f"收到来自 {client_address} 的请求: {request_data}") # 解析请求 method, path, headers = self._parse_request(request_data) # 处理请求 if path == '/': # 默认返回index.html response = self._serve_file('index.html') elif path == '/hello': response = self._create_response("Hello from the server!") elif path == '/time': current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') response = self._create_response(f"Current time: {current_time}") elif path.startswith('/static/'): # 处理静态文件请求 file_path = path[8:] # 移除'/static/'前缀 response = self._serve_file(file_path) else: # 尝试直接作为静态文件路径处理 response = self._serve_file(path.lstrip('/')) # 发送响应 client_socket.send(response) except Exception as e: print(f"处理客户端 {client_address} 时出错: {e}") finally: client_socket.close() def _parse_request(self, request_data: bytes) -> Tuple[str, str, Dict[str, str]]: #解析HTTP请求数据 #request_data: 原始HTTP请求数据 #tuple: (请求方法, 请求路径, 请求头字典) try: # 解码请求数据 request_text = request_data.decode('utf-8') request_lines = request_text.split('\r\n') # 解析请求行 method, path, _ = request_lines[0].split(' ') # 解析请求头 headers = {} for line in request_lines[1:]: if ':' in line: key, value = line.split(':', 1) headers[key.strip()] = value.strip() # URL解码路径 path = urllib.parse.unquote(path) return method, path, headers except Exception as e: print(f"解析请求时出错: {e}") return 'GET', '/', {} def _serve_file(self, file_path: str) -> bytes: #提供静态文件服务 #file_path: 请求的文件路径 #bytes: HTTP响应数据 # 构建完整的文件路径 full_path = os.path.join(self.static_dir, file_path) print(full_path) try: # 检查文件是否存在且在static目录下 if not os.path.exists(full_path) or not os.path.isfile(full_path): return self._create_response("404 Not Found", status_code=404) # 读取文件内容 with open(full_path, 'rb') as f: content = f.read() # 获取文件的MIME类型 content_type, _ = mimetypes.guess_type(full_path) if not content_type: content_type = 'application/octet-stream' # 创建响应头 headers = { 'Content-Type': content_type, 'Content-Length': len(content), 'Connection': 'close' } # 构建响应 response_headers = '\r\n'.join([f'{k}: {v}' for k, v in headers.items()]) response = f'HTTP/1.1 200 OK\r\n{response_headers}\r\n\r\n'.encode() return response + content except Exception as e: print(f"提供文件 {file_path} 时出错: {e}") return self._create_response("500 Internal Server Error", status_code=500) def _create_response(self, content: str, status_code: int = 200, content_type: str = 'text/html') -> bytes: """ 创建HTTP响应 参数: content: 响应内容 status_code: HTTP状态码,默认200 content_type: 内容类型,默认'text/html' 返回: bytes: 编码后的HTTP响应 """ status_messages = { 200: 'OK', 404: 'Not Found', 500: 'Internal Server Error' } response = f"""HTTP/1.1 {status_code} {status_messages.get(status_code, 'OK')} Content-Type: {content_type} Content-Length: {len(content)} Connection: close Date: {time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime())} Server: Python-Socket-Web-Server {content}""" return response.encode() def stop(self) -> None: """ 停止服务器 关闭HTTP和HTTPS服务器套接字 """ if self.http_server: self.http_server.close() if self.https_server: self.https_server.close()
配置服务启动
import os import signal import sys from StaticFileServer import StaticFileServer from server import WebServer def setup_signal_handlers(server: WebServer) -> None: """ 设置信号处理器 处理SIGINT和SIGTERM信号,优雅地关闭服务器 参数: server: WebServer实例 """ def signal_handler(sig, frame): """ 信号处理函数 处理中断信号,停止服务器并退出程序 参数: sig: 信号编号 frame: 当前栈帧 """ print("\n接收到中断信号,正在关闭服务器...") server.stop() sys.exit(0) # 注册信号处理器 signal.signal(signal.SIGINT, signal_handler) # 处理Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # 处理终止信号 def main(): print( "Hello World") static_server = StaticFileServer() static_server.ensure_static_dir_exists() static_server.set_path("socket_tls/cert/") print(static_server.dir) static_server.setup_ssl_certificates('socket_tls/cert/cert.pem', 'socket_tls/cert/key.pem') try: server = WebServer( '0.0.0.0', 8080, 8443, static_server.dir+'cert.pem', static_server.dir+'key.pem', "socket_tls/static/", ) # 设置信号处理器 setup_signal_handlers(server) print("正在启动Web服务器...") server.start() print("\n服务器已启动:") print("\n可用路径:") print("- / - 主页") print("- /hello - 问候消息") print("- /time - 当前服务器时间") except Exception as e: print(f"启动服务器时出错: {e}") sys.exit(1) if __name__ == '__main__': main()