通讯协议

网络协议   2025-08-16 09:18   120   0  

通讯协议

概念

包含关系 :网络协议是通讯协议在计算机网络领域的

载体关系: 计算机网络是网络协议的物理载体

网络协议 是计算机网络的规范定义的沟通规则

通讯协议

类比:广义沟通规则  含交通法规+手语+摩尔斯电码等所有规则

定义:设备之间进行对话时共同遵守的一套规则和约定(不限介质、场景)

核心:语法(数据格式)、语义(含义解析)、时序(响应顺序)

特性:跨领域通用性 如:网络协议:TCP/IP/硬件协议:PCIe, HDMI/物联网协议:LoRaWAN

计算机网络

类比:公路系统  物理基础设施(道路+桥梁)

定义:通过通信设备和线路连接的自治计算机集合体

核心:实现资源共享(数据/硬件)和信息传递

类型:局域网, 广域网, 网际网

网络协议

类比:交通法规  确保车辆有序通行的规则(红绿灯/限速)

定义:计算机网络通信

核心: 解决寻址、路由、可靠传输、应用交互等网络特有问题,必须运行在计算机网络中

层级模型:TCP/IP四层模型、OSI七层模型

网络协议模型

OSI模型

层级

分层

功能

协议/设备

单位

应用层

  • 为用户应用程序提供网络服务接口

  • 定义应用级协议规范

HTTP(网页)

FTP(文件传输)

SMTP(邮件)

DNS(域名解析)


表示层

  • 数据格式转换(编码/解码)

  • 加密/解密

  • 压缩/解压缩

SSL/TLS(加密)

JPEG/MPEG(压缩)ASCII/Unicode(编码)


会话层

  • 建立、管理、终止应用程序间会话

  • 会话同步与恢复(如断点续传)

NetBIOS

RPC

SSH会话管理


传输层

  • 端到端连接管理(TCP/UDP)

  • 数据分段/重组

  • 可靠性保证(TCP的确认重传机制)

  • 流量控制与拥塞控制

TCP(可靠连接)

UDP(无连接高速传输)


网络层

  • 逻辑寻址(IP地址)

  • 路由选择(最佳路径决策)

  • 跨网络的数据包传输

IP(IPv4/IPv6)

ICMP

OSPF

BGP

路由器(Router)

数据包

数据链路层

  • 将比特流组织成帧

  • 物理寻址(MAC地址)

  • 错误检测(CRC校验)

  • 流量控制(如交换机)

Ethernet(MAC子层)

VLAN

网桥(Bridge)

交换机(Switch)

物理层

义物理介质特性

以太网  光纤 集线器 中继器

比特

数据封装与解封装流程

TCP/IP模型

层级

分层

功能

协议/设备

应用

应用层

  • 整合了 OSI 的应用层、表示层、会话层功能

  • 定义应用协议规范(如 HTTP 的请求/响应结构)

  • 数据编码(JSON/XML)、加密(TLS)、会话管理(Cookie)

HTTP(网页)

HTTPS(加密网页)

FTP(文件传输)

SMTP(邮件)

DNS(域名解析)

  • 应用程序(浏览器、邮件客户端)

  • 服务器(Web服务器、DNS服务器)

传输层

  • 端到端连接管理(TCP/UDP)

  • 数据分段/重组

  • 可靠性保证(TCP的确认重传机制)

  • 高速无连接传输(UDP)

  • 流量控制与拥塞控制

TCP(可靠连接)

• 三次握手建立连接

• 确认重传机制

UDP(无连接高速)

• 低延迟、无保障

• 适用于实时应用

  • 操作系统协议栈(TCP/UDP模块)

  • 防火墙(端口过滤)

网络层

  • 逻辑寻址(IP地址)

  • 路由选择(最佳路径决策)

  • 跨网络的数据包传输

  • 数据包分片与重组

  • 错误诊断与控制

  • IP(IPv4/IPv6)

  • ICMP(网络诊断,如 ping

  • ARP(IP→MAC地址解析)

  • OSPF/BGP(动态路由协议)

路由器(Router)

• 根据IP地址路由数据包

• 连接不同网络

三层交换机

物理层

  • 物理介质访问控制

  • 帧封装(添加MAC地址)

  • 误检测(CRC)

  • 比特流传输

  • Ethernet(有线局域网)

  • Wi-Fi(IEEE 802.11)

  • PPP(拨号连接)

HDLC(广域网封装)

交换机(Switch)

• 基于MAC地址转发帧

网卡(NIC)

• 物理信号转换

网桥(Bridge)

调制解调器(Modem)

数据流程

  1. 用户输入 http://www.uufo.top → 应用层生成 HTTP GET 请求报文。

  2. 传输层(TCP) → 添加 TCP 头(源端口 54321,目标端口 80)。

  3. 网络层(IP) → 添加 IP 头(源 IP 192.168.1.100,目标 IP 93.184.216.34)。

  4. 网络接口层(以太网) → 添加帧头(源 MAC 00:11:22:AA:BB:CC,目标 MAC 路由器 MAC)。

  5. 物理传输 → 比特流经交换机、路由器抵达目标服务器。

  6. 服务器解封装 → 逆向剥离头部,将 HTTP 请求交给 Web 服务程序处理。

流程图


常见协议

Http  TCP

请求流程

tcp三次握手-lts加密-tcp连接结束

LTS1.2++ ECDHE_RSA 流程

步骤

  1. 客户端 → 服务器(ClientHello)

    1. 发送随机数

    2. 支持的 TLS 版本

    3. 密码套件列表

    4. 扩展(如 SNI、支持的椭圆曲线)

  2. 服务器 → 客户端(ServerHello)

    1. ServerHello    确认 TLS 版本、密码套件、服务器随机数

    2. Certificate    发送服务器证书链(身份认证)

    3. ServerKeyExchange 提供 ECDHE 参数(临时公钥 + 签名)

    4. ServerHelloDone 通知客户端服务器消息发送完毕

  3. 客户端 → 服务器 ClientKeyExchange

    1. 发送客户端的 ECDHE 临时公钥

  4. 客户端 → 服务器 ChangeCipherSpec

    1. 通知:后续消息将使用协商的密钥加密

  5. 客户端 → 服务器 Finished

    1. 第一条加密消息,验证握手完整性(含 verify_data

  6. 服务器 → 客户端 ChangeCipherSpec

    1. 通知:后续消息将加密

  7. 服务器 → 客户端 Finished

    1. 加密的 verify_data,双方确认密钥一致

请求流程合并或消息合并

  • 每个步骤并不严格对应一个独立的网络请求。TLS 协议允许将多个握手消息合并到同一个 TCP 报文段(Packet) 中发送

  • 最少 TCP 请求数

    • 完整 TLS 1.2 握手通常需要 4-5 个 TCP 请求,并非全部

    • 客户端 → 服务器:3 个请求(ClientHelloClientKeyExchangeChangeCipherSpec + Finished

    • 服务器 → 客户端:2 个请求(合并消息、ChangeCipherSpec + Finished

  • 步骤2的a-d就是合并为1个tcp请求

socket+tls实践

初始化并启动Web服务器

配置参数

# 配置参数
    host = '0.0.0.0'  # 监听所有网络接口
    http_port = 8080   # HTTP服务端口
    https_port = 8443   # HTTPS服务端口
    cert_dir = 'socket_tls_web/cert/'   # SSL证书存放目录
    cert_path = os.path.join(cert_dir, 'server.crt')  # 证书文件路径
    key_path = os.path.join(cert_dir, 'server.key')  # 私钥文件路径
    print("正在初始化Web服务器...")
    static_dir = 'socket_tls_web/cert/'  # 静态文件目录

配置静态目录

import os
from generate_cert import SSLCertificateGenerator

class StaticFileServer(object):
    def __init__(self):
        self.dir = 'socket_tls/static'
    def set_path(self,dir):
        self.dir = dir
    def ensure_static_dir_exists(self):
        """确保静态文件目录存在,如果不存在则创建,并生成默认的index.html文件
        参数:
            cert_dir: 证书目录
            cert_path: 证书文件路径
            key_path: 私钥文件路径
            
        返回:
            bool: 证书设置是否成功
        """
        
        if not os.path.exists(self.dir):
            try:
                os.makedirs(self.dir)
                print(f"创建静态文件目录: {self.dir}")

                # 创建默认的index.html
                index_path = os.path.join(self.dir, 'index.html')
                if not os.path.exists(index_path):
                    with open(index_path, 'w', encoding='utf-8') as f:
                        f.write("""
<!DOCTYPE html>
<html>
<head>
    <title>Socket TLS Web Server</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; }
        h1 { color: #333; }
    </style>
</head>
<body>
    <h1>Welcome to Socket TLS Web Server</h1>
    <p>This is a simple web server implemented using Python sockets with TLS support.</p>
    <p>Available endpoints:</p>
    <ul>
        <li><a href="/">/</a> - This page</li>
        <li><a href="/hello">/hello</a> - A simple greeting</li>
        <li><a href="/time">/time</a> - Current server time</li>
    </ul>
</body>
</html>
""")
                print("创建默认index.html文件")
            except Exception as e:
                print(f"创建静态文件目录失败: {e}")
                raise SystemExit(1)
    def setup_ssl_certificates(self,cert_path: str, key_path: str) -> bool:
        """
        设置SSL证书
        检查证书是否存在,如不存在则生成自签名证书
        参数:
            cert_dir: 证书目录
            cert_path: 证书文件路径
            key_path: 私钥文件路径
        返回:
            bool: 证书设置是否成功
        """
        # 确保证书目录存在
        if not os.path.exists(self.dir):
            try:
                os.makedirs(self.dir)
                print(f"创建证书目录: {self.dir}")
            except Exception as e:
                print(f"创建证书目录失败: {e}")
                return False
        # 检查证书和私钥是否存在
        if not os.path.exists(cert_path) or not os.path.exists(key_path):
            print("证书或私钥不存在,正在生成自签名证书...")
            print(f"证书路径: {self.dir+cert_path}")
            try:
                #调用证书生成类
                cert_generator = SSLCertificateGenerator()  # 创建证书生成器实例
                cert_generator.generate_self_signed_cert(self.dir+cert_path, self.dir+key_path)  # 调用方法
                print(f"已生成自签名证书: {self.dir+cert_path}")
                print(f"已生成私钥: {self.dir+key_path}")
            except Exception as e:
                print(f"生成证书失败: {e}")
                return False
        else:
            print("使用现有的SSL证书和私钥")
        
        return True

证书生成类

from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from datetime import datetime, timedelta
import os
from typing import Tuple

class SSLCertificateGenerator:
    def __init__(self, common_name: str = "localhost", country: str = "CN", state: str = "Beijing", locality: str = "Beijing", org: str = "Test Organization", org_unit: str = "IT Department", valid_days: int = 365):
        self.common_name = common_name
        self.country = country
        self.state = state
        self.locality = locality
        self.org = org
        self.org_unit = org_unit
        self.valid_days = valid_days
        self.private_key = None
        self.certificate = None

    def generate_rsa_key(self, key_size: int = 2048) -> rsa.RSAPrivateKey:
        """
        生成RSA密钥对

        参数:
            key_size: RSA密钥大小(位),默认2048位

        返回:
            rsa.RSAPrivateKey: 生成的RSA私钥对象
        """
        return rsa.generate_private_key(
            public_exponent=65537,  # 常用的公开指数
            key_size=key_size
        )

    def create_certificate(self, private_key: rsa.RSAPrivateKey) -> x509.Certificate:
        """
        创建X509证书

        参数:
            private_key: RSA私钥对象

        返回:
            x509.Certificate: 生成的X509证书对象
        """
        # 创建证书主题
        subject = x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, self.common_name),
            x509.NameAttribute(NameOID.COUNTRY_NAME, self.country),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, self.state),
            x509.NameAttribute(NameOID.LOCALITY_NAME, self.locality),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, self.org),
            x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, self.org_unit),
        ])

        # 设置证书有效期
        now = datetime.utcnow()
        cert_builder = x509.CertificateBuilder(
            issuer_name=subject,  # 自签名证书的颁发者和主题相同
            subject_name=subject,
            public_key=private_key.public_key(),
            serial_number=x509.random_serial_number(),
            not_valid_before=now,
            not_valid_after=now + timedelta(days=self.valid_days)
        )

        # 添加基本约束扩展(表明这是CA证书)
        cert_builder = cert_builder.add_extension(
            x509.BasicConstraints(ca=True, path_length=None),
            critical=True
        )

        # 添加密钥用途扩展
        cert_builder = cert_builder.add_extension(
            x509.KeyUsage(
                digital_signature=True,
                key_encipherment=True,
                content_commitment=True,
                data_encipherment=False,
                key_agreement=False,
                key_cert_sign=True,
                crl_sign=True,
                encipher_only=False,
                decipher_only=False
            ),
            critical=True
        )

        # 添加扩展密钥用途扩展
        cert_builder = cert_builder.add_extension(
            x509.ExtendedKeyUsage([
                x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
                x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
            ]),
            critical=False
        )

        # 添加主题备用名称扩展
        cert_builder = cert_builder.add_extension(
            x509.SubjectAlternativeName([
                x509.DNSName(self.common_name),
                x509.DNSName("localhost"),
            ]),
            critical=False
        )

        # 使用SHA256算法签名证书
        certificate = cert_builder.sign(
            private_key=private_key,
            algorithm=hashes.SHA256()
        )

        return certificate

    def save_key_and_cert(self, cert_path: str, key_path: str) -> None:
        """
        保存私钥和证书到文件

        参数:
            cert_path: 证书保存路径
            key_path: 私钥保存路径
        """
        # 确保证书目录存在
        cert_dir = os.path.dirname(cert_path)
        if cert_dir and not os.path.exists(cert_dir):
            os.makedirs(cert_dir)

        # 保存私钥(PEM格式)
        with open(key_path, "wb") as f:
            f.write(self.private_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption()
            ))

        # 保存证书(PEM格式)
        with open(cert_path, "wb") as f:
            f.write(self.certificate.public_bytes(
                encoding=serialization.Encoding.PEM
            ))

    def generate_self_signed_cert(self, cert_path: str, key_path: str) -> Tuple[str, str]:
        """
        生成自签名SSL证书和私钥

        参数:
            cert_path: 证书保存路径
            key_path: 私钥保存路径

        返回:
            tuple: (证书路径, 私钥路径)
        """
        try:
            # 生成RSA密钥对
            self.private_key = self.generate_rsa_key()
            # 创建证书
            self.certificate = self.create_certificate(self.private_key)
            # 保存证书和私钥
            self.save_key_and_cert(cert_path, key_path)

            return cert_path, key_path

        except Exception as e:
            raise Exception(f"生成证书失败: {e}")

实现web服务

"""
Socket TLS Web服务器核心实现
提供HTTP和HTTPS服务器功能,支持静态文件服务和基本路由处理
类:
    WebServer: 实现基本的HTTP/HTTPS Web服务器功能
"""
import os
import socket
import ssl
import threading
import time
from datetime import datetime
from typing import Tuple, Optional, Dict, Union
import mimetypes
import urllib.parse

class WebServer(object):
    """
    Web服务器类,处理HTTP和HTTPS请求
    
    属性:
        host (str): 服务器主机地址
        http_port (int): HTTP服务端口号
        https_port (int): HTTPS服务端口号
        cert_path (str): SSL证书文件路径
        key_path (str): SSL私钥文件路径
        static_dir (str): 静态文件目录路径
        http_server (socket.socket): HTTP服务器套接字
        https_server (socket.socket): HTTPS服务器套接字
    """
    def __init__(self,
                 host: str = '0.0.0.0',
                 http_port: int = 8080,
                 https_port: int = 8443,
                 cert_path: str = 'socket_tls/cert/cert.crt',
                 key_path: str = 'socket_tls/cert/key.key',
                 static_dir: str = 'socket_tls/static/'):
        # 初始化Web服务器
        self.host = host #服务器监听的主机地址,默认'0.0.0.0'表示所有网络接口
        self.http_port = http_port #HTTP服务端口号,默认8080
        self.https_port = https_port #HTTP服务端口号,默认8080
        self.cert_path = cert_path #SSL证书文件路径,默认'cert/server.crt'
        self.key_path = key_path  #SSL私钥文件路径,默认'cert/server.key'
        self.static_dir = static_dir  #静态文件目录路径,默认'static'
         # 初始化服务器套接字
        self.http_server = None  #HTTP服务器套接字
        self.https_server = None #HTTPS服务器套接字
    def start(self):
        """
        启动Web服务器  创建并启动HTTP和HTTPS服务器线程
        """
        # 创建HTTP服务器
        self.http_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 创建一个TCP套接字(SOCK_STREAM),用于HTTP服务,使用IPv4地址族(AF_INET)
        self.http_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #设置同样的套接字选项,允许地址重用
        self.http_server.bind((self.host, self.http_port)) #绑定服务器套接字到指定地址和端口
        self.http_server.listen(5) #开始监听传入连接,设置最大挂起连接数为5
        # 创建HTTPS服务器
        self.https_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #同样创建一个TCP套接字,用于HTTPS服务
        self.https_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)#设置同样的套接字选项,允许地址重用
        self.https_server.bind((self.host, self.https_port)) #将HTTPS套接字绑定到指定的主机地址和HTTPS端口上
        self.https_server.listen(5) #开始监听HTTPS传入连接,最大挂起连接数同样为5
        # #创建http和https的线程
        #创建一个线程http_thread,用于运行_run_server方法,传入HTTP套接字和参数False(表示这是HTTP服务)
        http_thread = threading.Thread(target=self._run_server, args=(self.http_server, False))
        #创建另一个线程https_thread,用于运行_run_server方法,传入HTTPS套接字和参数True
        https_thread = threading.Thread(target=self._run_server, args=(self.https_server, True))
        #启动线程
        http_thread.start()
        https_thread.start()
        print(f"HTTP服务器运行在 http://{self.host}:{self.http_port}")
        print(f"HTTPS服务器运行在 https://{self.host}:{self.https_port}")
    def _run_server(self, server: socket.socket, is_https: bool) -> None:
        print(f"启动 {server.getsockname()} 服务")
        #运行服务器主循环
        #server: 服务器套接字对象
        #s_https: 是否是HTTPS服务器
        #循环,持续监听并接受客户端连接。
        while True:
            try:
                #使用 server.accept() 阻塞等待客户端连接
                #当有客户端连接时,返回一个新的客户端套接字对象 client_socket 和客户端地址 client_address
                client_socket, client_address = server.accept()
                #判断请求协议是否是HTTPS
                if is_https:
                    # 配置SSL上下文
                    print("https加载证书")
                    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) #创建一个 ssl.SSLContext 对象,指定使用 PROTOCOL_TLS_SERVER 协议
                    context.load_cert_chain(self.cert_path, self.key_path) #加载证书和私钥文件(路径由 self.cert_path 和 self.key_path 指定)
                    print("https加载证书成功")
                    client_socket = context.wrap_socket(client_socket, server_side=True) #使用 context.wrap_socket() 方法将普通的 client_socket 包装成一个支持SSL/TLS 的安全套接字
                # 为每个客户端创建新线程
                #client_socket:客户端套接字;
                #client_address:客户端地址;
                #is_https:是否是 HTTPS 连接;
                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket, client_address, is_https)
                )
                #调用 start() 方法启动线程
                client_thread.start()
            except Exception as e:
                print(f"处理连接时出错: {e}")
    def _handle_client(self, 
                      client_socket: Union[socket.socket, ssl.SSLSocket],
                      client_address: Tuple[str, int],
                      is_https: bool) -> None:
        #处理客户端连接
        #client_socket: 客户端套接字
        #client_address: 客户端地址元组 (IP, 端口)
        #is_https: 是否是HTTPS连接
        try:
            # 接收请求数据
            request_data = client_socket.recv(1024)
            if not request_data:
                return
            print(f"收到来自 {client_address} 的请求: {request_data}")
            # 解析请求
            method, path, headers = self._parse_request(request_data)
            
            # 处理请求
            if path == '/':
                # 默认返回index.html
                response = self._serve_file('index.html')
            elif path == '/hello':
                response = self._create_response("Hello from the server!")
            elif path == '/time':
                current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                response = self._create_response(f"Current time: {current_time}")
            elif path.startswith('/static/'):
                # 处理静态文件请求
                file_path = path[8:]  # 移除'/static/'前缀
                response = self._serve_file(file_path)
            else:
                # 尝试直接作为静态文件路径处理
                response = self._serve_file(path.lstrip('/'))
            # 发送响应
            client_socket.send(response)
            
        except Exception as e:
            print(f"处理客户端 {client_address} 时出错: {e}")
        finally:
            client_socket.close()
    def _parse_request(self, request_data: bytes) -> Tuple[str, str, Dict[str, str]]:
        #解析HTTP请求数据
        #request_data: 原始HTTP请求数据
        #tuple: (请求方法, 请求路径, 请求头字典)
        try:
            # 解码请求数据
            request_text = request_data.decode('utf-8')
            request_lines = request_text.split('\r\n')
            
            # 解析请求行
            method, path, _ = request_lines[0].split(' ')
            
            # 解析请求头
            headers = {}
            for line in request_lines[1:]:
                if ':' in line:
                    key, value = line.split(':', 1)
                    headers[key.strip()] = value.strip()
            
            # URL解码路径
            path = urllib.parse.unquote(path)
            return method, path, headers
        except Exception as e:
            print(f"解析请求时出错: {e}")
            return 'GET', '/', {}
    def _serve_file(self, file_path: str) -> bytes:
        #提供静态文件服务
        #file_path: 请求的文件路径
        #bytes: HTTP响应数据
        # 构建完整的文件路径
        full_path = os.path.join(self.static_dir, file_path)
        print(full_path)
        try:
            # 检查文件是否存在且在static目录下
            if not os.path.exists(full_path) or not os.path.isfile(full_path):
                return self._create_response("404 Not Found", status_code=404)
            
            # 读取文件内容
            with open(full_path, 'rb') as f:
                content = f.read()
            
            # 获取文件的MIME类型
            content_type, _ = mimetypes.guess_type(full_path)
            if not content_type:
                content_type = 'application/octet-stream'
            
            # 创建响应头
            headers = {
                'Content-Type': content_type,
                'Content-Length': len(content),
                'Connection': 'close'
            }
            
            # 构建响应
            response_headers = '\r\n'.join([f'{k}: {v}' for k, v in headers.items()])
            response = f'HTTP/1.1 200 OK\r\n{response_headers}\r\n\r\n'.encode()
            return response + content
            
        except Exception as e:
            print(f"提供文件 {file_path} 时出错: {e}")
            return self._create_response("500 Internal Server Error", status_code=500)
    def _create_response(self, 
                        content: str,
                        status_code: int = 200,
                        content_type: str = 'text/html') -> bytes:
        """
        创建HTTP响应
        
        参数:
            content: 响应内容
            status_code: HTTP状态码,默认200
            content_type: 内容类型,默认'text/html'
            
        返回:
            bytes: 编码后的HTTP响应
        """
        status_messages = {
            200: 'OK',
            404: 'Not Found',
            500: 'Internal Server Error'
        }
        
        response = f"""HTTP/1.1 {status_code} {status_messages.get(status_code, 'OK')}
Content-Type: {content_type}
Content-Length: {len(content)}
Connection: close
Date: {time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime())}
Server: Python-Socket-Web-Server

{content}"""
        return response.encode()
    def stop(self) -> None:
        """
        停止服务器
        
        关闭HTTP和HTTPS服务器套接字
        """
        if self.http_server:
            self.http_server.close()
        if self.https_server:
            self.https_server.close()

配置服务启动

import os
import signal
import sys
from StaticFileServer import StaticFileServer
from server import WebServer
def setup_signal_handlers(server: WebServer) -> None:
    """
    设置信号处理器
    
    处理SIGINT和SIGTERM信号,优雅地关闭服务器
    
    参数:
        server: WebServer实例
    """
    def signal_handler(sig, frame):
        """
        信号处理函数
        
        处理中断信号,停止服务器并退出程序
        
        参数:
            sig: 信号编号
            frame: 当前栈帧
        """
        print("\n接收到中断信号,正在关闭服务器...")
        server.stop()
        sys.exit(0)
    
    # 注册信号处理器
    signal.signal(signal.SIGINT, signal_handler)  # 处理Ctrl+C
    signal.signal(signal.SIGTERM, signal_handler)  # 处理终止信号
def main():
    print( "Hello World")
    static_server = StaticFileServer()
    static_server.ensure_static_dir_exists()
    static_server.set_path("socket_tls/cert/")
    print(static_server.dir)
    static_server.setup_ssl_certificates('socket_tls/cert/cert.pem', 'socket_tls/cert/key.pem')
    try:
        server = WebServer(
            '0.0.0.0', 
            8080, 
            8443, 
            static_server.dir+'cert.pem', 
            static_server.dir+'key.pem',
            "socket_tls/static/",
        )
        # 设置信号处理器
        setup_signal_handlers(server)
        
        print("正在启动Web服务器...")
        server.start()
        print("\n服务器已启动:")
        print("\n可用路径:")
        print("- / - 主页")
        print("- /hello - 问候消息")
        print("- /time - 当前服务器时间")
    except Exception as e:
        print(f"启动服务器时出错: {e}")
        sys.exit(1)
if __name__ == '__main__':
    main()


博客评论
还没有人评论,赶紧抢个沙发~
发表评论
说明:请文明发言,共建和谐网络,您的个人信息不会被公开显示。