返回文章列表
服务器

服务器怎么安装Redis?服务器安装Redis宝典

毵毵
2026-02-12
2天前

安装方式(推荐官方源安装)

方式一:APT 包管理器(推荐)


# 更新包列表sudo apt update# 安装 Redissudo apt install redis-server# 验证安装redis-server --version

方式二:Snap 包管理器


# 安装 Redissudo snap install redis# 验证安装snap list redis

方式三:从源码编译(最新版本)


# 安装编译依赖sudo apt install build-essential tcl# 下载最新源码(检查官网获取最新版本号)wget http://download.redis.io/redis-stable.tar.gztar xzf redis-stable.tar.gzcd redis-stable# 编译makemake test  # 可选,运行测试# 安装到系统sudo make install# 验证安装redis-server --version

服务配置与启动

1. 启动 Redis 服务


# 启动服务sudo systemctl start redis-server# 设置开机自启sudo systemctl enable redis-server# 检查服务状态sudo systemctl status redis-server

2. 配置文件位置


# 默认配置文件路径/etc/redis/redis.conf# 备份原配置sudo cp /etc/redis/redis.conf /etc/redis/redis.conf.bak

3. 常用配置修改


# 编辑配置文件sudo nano /etc/redis/redis.conf

关键配置项:


# 绑定地址(允许远程访问)bind 0.0.0.0  # 注释掉或改为 0.0.0.0 允许远程连接# bind 127.0.0.1 ::1  # 只允许本地访问(默认)# 安全密码requirepass your_strong_password# 后台运行daemonize yes# 数据持久化save 900 1save 300 10save 60 10000# 日志级别loglevel noticelogfile /var/log/redis/redis-server.log# 数据库文件位置dir /var/lib/redis

4. 重启服务使配置生效


sudo systemctl restart redis-server

验证安装


1. 检查端口占用sudo netstat -tlnp | grep :6379# 或sudo ss -tlnp | grep :63792. 连接测试# 使用 Redis CLIredis-cli# 在 Redis CLI 中测试127.0.0.1:6379> pingPONG  # 成功响应127.0.0.1:6379> set test "Hello Redis"OK127.0.0.1:6379> get test"Hello Redis"127.0.0.1:6379> quit3. 远程连接测试(如果配置了 bind 0.0.0.0)# 从另一台机器连接redis-cli -h YOUR_SERVER_IP -p 6379 -a your_password

重要路径表格

类型路径说明
配置文件/etc/redis/redis.conf主配置文件
数据目录/var/lib/redisRDB/AOF 文件存储位置
日志文件/var/log/redis/redis-server.log服务日志
服务名称redis-serverSystemd 服务名
1. 设置密码认证# 在 redis.conf 中添加requirepass your_strong_password# 重启服务sudo systemctl restart redis-server2. 绑定特定 IP# 在 redis.conf 中修改bind 127.0.0.1 YOUR_SERVER_IP  # 限制访问来源3. 防火墙配置# 如果使用 UFWsudo ufw allow 6379/tcp  # 允许 Redis 端口(谨慎开放)# 或限制特定 IP 访问sudo ufw allow from SPECIFIC_IP to any port 6379常用管理命令# 服务控制sudo systemctl start redis-server    # 启动sudo systemctl stop redis-server     # 停止sudo systemctl restart redis-server  # 重启sudo systemctl status redis-server   # 状态sudo systemctl disable redis-server  # 禁用开机自启# 查看日志sudo tail -f /var/log/redis/redis-server.log# 测试配置文件语法redis-check-config /etc/redis/redis.conf
# 检查内存使用
redis-cli info memory

性能基准测试


# Redis 自带性能测试工具redis-benchmark -h localhost -p 6379 -n 100000 -q

Python 连接测试


# 安装 Python Redis 客户端pip install redis# 测试脚本python3 -c "import redisr = redis.Redis(host='localhost', port=6379, db=0)r.set('test', 'Hello from Python!')print(r.get('test'))
import osclass RedisConfig:    HOST = os.getenv('REDIS_HOST', 'x.x.x.x')    PORT = int(os.getenv('REDIS_PORT', 6379))    PASSWORD = os.getenv('REDIS_PASSWORD', 'Ms123!@#')    DB = int(os.getenv('REDIS_DB', 0))    MAX_CONNECTIONS = int(os.getenv('REDIS_MAX_CONNECTIONS', 10))    SOCKET_TIMEOUT = int(os.getenv('REDIS_SOCKET_TIMEOUT', 5))
  • # redis_utils.pyimport redisimport loggingfrom typing import Any, Optional, Union, List, Dictfrom contextlib import contextmanagerfrom config.RedisConfig import RedisConfigclass RedisUtils:    """    Redis 工具类,提供常用的 Redis 操作方法    """    def __init__(self,                 host: str = '8.136.110.137',                 port: int = 6379,                 password: str= 'Ms123!@#',                 db: int = 0,                 decode_responses: bool = True,                 connection_pool_size: int = 10,                 socket_timeout: int = 5):        """        初始化 Redis 连接        Args:            host: Redis 服务器地址            port: Redis 端口            password: Redis 密码            db: 数据库编号            decode_responses: 是否解码响应为字符串            connection_pool_size: 连接池大小            socket_timeout: Socket 超时时间        """        try:            # 创建连接池            pool = redis.ConnectionPool(                host=host,                port=port,                password=password,                db=1,                decode_responses=decode_responses,                max_connections=connection_pool_size,                socket_timeout=socket_timeout            )            self.client = redis.Redis(connection_pool=pool)            # 测试连接            self.client.ping()            logging.info(f"✅ Redis 连接成功: {host}:{port}")        except redis.AuthenticationError:            raise ConnectionError("Redis 认证失败,请检查密码")        except redis.ConnectionError:            raise ConnectionError(f"无法连接到 Redis 服务器: {host}:{port}")        except Exception as e:            raise ConnectionError(f"Redis 连接异常: {str(e)}")    def ping(self) -> bool:        """测试连接"""        try:            return self.client.ping()        except Exception as e:            logging.error(f"Ping 失败: {e}")            return False    # ==================== String 操作 ====================    def set(self, key: str, value: Any, ex: Optional[int] = None) -> bool:        """设置字符串值"""        try:            result = self.client.set(key, value, ex=ex)            return result is True        except Exception as e:            logging.error(f"SET 操作失败: {e}")            return False    def get(self, key: str) -> Optional[str]:        """获取字符串值"""        try:            return self.client.get(key)        except Exception as e:            logging.error(f"GET 操作失败: {e}")            return None    def mget(self, keys: List[str]) -> List[Optional[str]]:        """批量获取多个键的值"""        try:            return self.client.mget(keys)        except Exception as e:            logging.error(f"MGET 操作失败: {e}")            return []    def exists(self, key: str) -> bool:        """检查键是否存在"""        try:            return self.client.exists(key) == 1        except Exception as e:            logging.error(f"EXISTS 操作失败: {e}")            return False    def delete(self, *keys) -> int:        """删除一个或多个键"""        try:            return self.client.delete(*keys)        except Exception as e:            logging.error(f"DELETE 操作失败: {e}")            return 0    def expire(self, key: str, time: int) -> bool:        """设置键的过期时间"""        try:            return self.client.expire(key, time)        except Exception as e:            logging.error(f"EXPIRE 操作失败: {e}")            return False    # ==================== Hash 操作 ====================    def hset(self, name: str, mapping: Dict[str, Any]) -> int:        """设置哈希表字段"""        try:            return self.client.hset(name, mapping=mapping)        except Exception as e:            logging.error(f"HSET 操作失败: {e}")            return 0    def hget(self, name: str, key: str) -> Optional[str]:        """获取哈希表字段值"""        try:            return self.client.hget(name, key)        except Exception as e:            logging.error(f"HGET 操作失败: {e}")            return None    def hgetall(self, name: str) -> Dict[str, str]:        """获取哈希表所有字段和值"""        try:            return self.client.hgetall(name)        except Exception as e:            logging.error(f"HGETALL 操作失败: {e}")            return {}    def hdel(self, name: str, *keys) -> int:        """删除哈希表的一个或多个字段"""        try:            return self.client.hdel(name, *keys)        except Exception as e:            logging.error(f"HDEL 操作失败: {e}")            return 0    # ==================== List 操作 ====================    def lpush(self, name: str, *values) -> int:        """向列表左侧插入元素"""        try:            return self.client.lpush(name, *values)        except Exception as e:            logging.error(f"LPUSH 操作失败: {e}")            return 0    def rpush(self, name: str, *values) -> int:        """向列表右侧插入元素"""        try:            return self.client.rpush(name, *values)        except Exception as e:            logging.error(f"RPUSH 操作失败: {e}")            return 0    def lpop(self, name: str) -> Optional[str]:        """从列表左侧弹出元素"""        try:            return self.client.lpop(name)        except Exception as e:            logging.error(f"LPOP 操作失败: {e}")            return None    def rpop(self, name: str) -> Optional[str]:        """从列表右侧弹出元素"""        try:            return self.client.rpop(name)        except Exception as e:            logging.error(f"RPOP 操作失败: {e}")            return None    def lrange(self, name: str, start: int, end: int) -> List[str]:        """获取列表指定范围的元素"""        try:            return self.client.lrange(name, start, end)        except Exception as e:            logging.error(f"LRANGE 操作失败: {e}")            return []    # ==================== Set 操作 ====================    def sadd(self, name: str, *values) -> int:        """向集合添加元素"""        try:            return self.client.sadd(name, *values)        except Exception as e:            logging.error(f"SADD 操作失败: {e}")            return 0    def smembers(self, name: str) -> set:        """获取集合所有成员"""        try:            return self.client.smembers(name)        except Exception as e:            logging.error(f"SMEMBERS 操作失败: {e}")            return set()    def sismember(self, name: str, value: str) -> bool:        """检查元素是否在集合中"""        try:            return self.client.sismember(name, value)        except Exception as e:            logging.error(f"SISMEMBER 操作失败: {e}")            return False    def srem(self, name: str, *values) -> int:        """从集合中移除元素"""        try:            return self.client.srem(name, *values)        except Exception as e:            logging.error(f"SREM 操作失败: {e}")            return 0    # ==================== Sorted Set 操作 ====================    def zadd(self, name: str, mapping: Dict[str, float]) -> int:        """向有序集合添加元素"""        try:            return self.client.zadd(name, mapping)        except Exception as e:            logging.error(f"ZADD 操作失败: {e}")            return 0    def zrange(self, name: str, start: int, end: int, desc: bool = False) -> List[str]:        """获取有序集合指定范围的元素"""        try:            return self.client.zrange(name, start, end, desc=desc)        except Exception as e:            logging.error(f"ZRANGE 操作失败: {e}")            return []    def zscore(self, name: str, value: str) -> Optional[float]:        """获取有序集合元素的分数"""        try:            return self.client.zscore(name, value)        except Exception as e:            logging.error(f"ZSCORE 操作失败: {e}")            return None    # ==================== 通用操作 ====================    def keys(self, pattern: str = '*') -> List[str]:        """获取匹配模式的所有键"""        try:            return self.client.keys(pattern)        except Exception as e:            logging.error(f"KEYS 操作失败: {e}")            return []    def flushdb(self) -> bool:        """清空当前数据库"""        try:            return self.client.flushdb() is True        except Exception as e:            logging.error(f"FLUSHDB 操作失败: {e}")            return False    def info(self) -> Dict[str, Any]:        """获取 Redis 服务器信息"""        try:            return self.client.info()        except Exception as e:            logging.error(f"INFO 操作失败: {e}")            return {}    def close(self):        """关闭连接"""        if hasattr(self, 'client'):            self.client.close()# ==================== 使用示例 ====================def example_usage():    """使用示例"""    import os    try:        redis_client = RedisUtils(            host=RedisConfig.HOST,            port=RedisConfig.PORT,            password=RedisConfig.PASSWORD,            db=RedisConfig.DB    )        if redis_client.ping():            print("✅ Redis 连接成功")        else:            print("❌ Redis 连接失败")            return        # String 操作示例        print("\n--- String 操作 ---")        redis_client.set("test_key", "Hello Redis!", ex=3600)        value = redis_client.get("test_key")        print(f"GET test_key: {value}")        # Hash 操作示例        print("\n--- Hash 操作 ---")        redis_client.hset("user:1", {"name": "Alice", "age": "25"})        user_info = redis_client.hgetall("user:1")        print(f"User info: {user_info}")        # List 操作示例        print("\n--- List 操作 ---")        redis_client.lpush("message_queue", "msg1", "msg2")        messages = redis_client.lrange("message_queue", 0, -1)        print(f"Messages: {messages}")        # Set 操作示例        print("\n--- Set 操作 ---")        redis_client.sadd("tags", "python", "redis", "database")        tags = redis_client.smembers("tags")        print(f"Tags: {tags}")        # 关闭连接        redis_client.close()    except ConnectionError as e:        print(f"❌ 连接错误: {e}")    except Exception as e:        print(f"❌ 其他错误: {e}")if __name__ == "__main__":    # 配置日志    logging.basicConfig(level=logging.INFO)    example_usage()
    
    


    本文内容仅供参考,不构成任何专业建议。使用本文提供的信息时,请自行判断并承担相应风险。

    分享文章
    合作伙伴

    本站所有广告均是第三方投放,详情请查询本站用户协议