服务器
服务器怎么安装Redis?服务器安装Redis宝典
毵毵
2026-02-12
2天前
安装方式(推荐官方源安装)
方式一:APT 包管理器(推荐)
# 更新包列表sudo apt update# 安装 Redissudo apt install redis-server# 验证安装redis-server --version
方式二:Snap 包管理器
# 安装 Redissudo snap install redis# 验证安装snap list redis
方式三:从源码编译(最新版本)
# 安装编译依赖sudo apt install build-essential tcl# 下载最新源码(检查官网获取最新版本号)wget http://download.redis.io/redis-stable.tar.gztar xzf redis-stable.tar.gzcd redis-stable# 编译makemake test # 可选,运行测试# 安装到系统sudo make install# 验证安装redis-server --version
服务配置与启动
1. 启动 Redis 服务
# 启动服务sudo systemctl start redis-server# 设置开机自启sudo systemctl enable redis-server# 检查服务状态sudo systemctl status redis-server
2. 配置文件位置
# 默认配置文件路径/etc/redis/redis.conf# 备份原配置sudo cp /etc/redis/redis.conf /etc/redis/redis.conf.bak
3. 常用配置修改
# 编辑配置文件sudo nano /etc/redis/redis.conf
关键配置项:
# 绑定地址(允许远程访问)bind 0.0.0.0 # 注释掉或改为 0.0.0.0 允许远程连接# bind 127.0.0.1 ::1 # 只允许本地访问(默认)# 安全密码requirepass your_strong_password# 后台运行daemonize yes# 数据持久化save 900 1save 300 10save 60 10000# 日志级别loglevel noticelogfile /var/log/redis/redis-server.log# 数据库文件位置dir /var/lib/redis
4. 重启服务使配置生效
sudo systemctl restart redis-server验证安装
1. 检查端口占用sudo netstat -tlnp | grep :6379# 或sudo ss -tlnp | grep :63792. 连接测试# 使用 Redis CLIredis-cli# 在 Redis CLI 中测试127.0.0.1:6379> pingPONG # 成功响应127.0.0.1:6379> set test "Hello Redis"OK127.0.0.1:6379> get test"Hello Redis"127.0.0.1:6379> quit3. 远程连接测试(如果配置了 bind 0.0.0.0)# 从另一台机器连接redis-cli -h YOUR_SERVER_IP -p 6379 -a your_password
重要路径表格
| 类型 | 路径 | 说明 |
| 配置文件 | /etc/redis/redis.conf | 主配置文件 |
| 数据目录 | /var/lib/redis | RDB/AOF 文件存储位置 |
| 日志文件 | /var/log/redis/redis-server.log | 服务日志 |
| 服务名称 | redis-server | Systemd 服务名 |
1. 设置密码认证# 在 redis.conf 中添加requirepass your_strong_password# 重启服务sudo systemctl restart redis-server2. 绑定特定 IP# 在 redis.conf 中修改bind 127.0.0.1 YOUR_SERVER_IP # 限制访问来源3. 防火墙配置# 如果使用 UFWsudo ufw allow 6379/tcp # 允许 Redis 端口(谨慎开放)# 或限制特定 IP 访问sudo ufw allow from SPECIFIC_IP to any port 6379常用管理命令# 服务控制sudo systemctl start redis-server # 启动sudo systemctl stop redis-server # 停止sudo systemctl restart redis-server # 重启sudo systemctl status redis-server # 状态sudo systemctl disable redis-server # 禁用开机自启# 查看日志sudo tail -f /var/log/redis/redis-server.log# 测试配置文件语法redis-check-config /etc/redis/redis.conf
# 检查内存使用
redis-cli info memory性能基准测试
# Redis 自带性能测试工具redis-benchmark -h localhost -p 6379 -n 100000 -q
Python 连接测试
# 安装 Python Redis 客户端pip install redis# 测试脚本python3 -c "import redisr = redis.Redis(host='localhost', port=6379, db=0)r.set('test', 'Hello from Python!')print(r.get('test'))
import osclass RedisConfig:HOST = os.getenv('REDIS_HOST', 'x.x.x.x')PORT = int(os.getenv('REDIS_PORT', 6379))PASSWORD = os.getenv('REDIS_PASSWORD', 'Ms123!@#')DB = int(os.getenv('REDIS_DB', 0))MAX_CONNECTIONS = int(os.getenv('REDIS_MAX_CONNECTIONS', 10)) SOCKET_TIMEOUT = int(os.getenv('REDIS_SOCKET_TIMEOUT', 5))
# redis_utils.pyimport redisimport loggingfrom typing import Any, Optional, Union, List, Dictfrom contextlib import contextmanagerfrom config.RedisConfig import RedisConfigclass RedisUtils:"""Redis 工具类,提供常用的 Redis 操作方法"""def __init__(self,host: str = '8.136.110.137',port: int = 6379,password: str= 'Ms123!@#',db: int = 0,decode_responses: bool = True,connection_pool_size: int = 10,socket_timeout: int = 5):"""初始化 Redis 连接Args:host: Redis 服务器地址port: Redis 端口password: Redis 密码db: 数据库编号decode_responses: 是否解码响应为字符串connection_pool_size: 连接池大小socket_timeout: Socket 超时时间"""try:# 创建连接池pool = redis.ConnectionPool(host=host,port=port,password=password,db=1,decode_responses=decode_responses,max_connections=connection_pool_size,socket_timeout=socket_timeout)self.client = redis.Redis(connection_pool=pool)# 测试连接self.client.ping()logging.info(f"✅ Redis 连接成功: {host}:{port}")except redis.AuthenticationError:raise ConnectionError("Redis 认证失败,请检查密码")except redis.ConnectionError:raise ConnectionError(f"无法连接到 Redis 服务器: {host}:{port}")except Exception as e:raise ConnectionError(f"Redis 连接异常: {str(e)}")def ping(self) -> bool:"""测试连接"""try:return self.client.ping()except Exception as e:logging.error(f"Ping 失败: {e}")return False# ==================== String 操作 ====================def set(self, key: str, value: Any, ex: Optional[int] = None) -> bool:"""设置字符串值"""try:result = self.client.set(key, value, ex=ex)return result is Trueexcept Exception as e:logging.error(f"SET 操作失败: {e}")return Falsedef get(self, key: str) -> Optional[str]:"""获取字符串值"""try:return self.client.get(key)except Exception as e:logging.error(f"GET 操作失败: {e}")return Nonedef mget(self, keys: List[str]) -> List[Optional[str]]:"""批量获取多个键的值"""try:return self.client.mget(keys)except Exception as e:logging.error(f"MGET 操作失败: {e}")return []def exists(self, key: str) -> bool:"""检查键是否存在"""try:return self.client.exists(key) == 1except Exception as e:logging.error(f"EXISTS 操作失败: {e}")return Falsedef delete(self, *keys) -> int:"""删除一个或多个键"""try:return self.client.delete(*keys)except Exception as e:logging.error(f"DELETE 操作失败: {e}")return 0def expire(self, key: str, time: int) -> bool:"""设置键的过期时间"""try:return self.client.expire(key, time)except Exception as e:logging.error(f"EXPIRE 操作失败: {e}")return False# ==================== Hash 操作 ====================def hset(self, name: str, mapping: Dict[str, Any]) -> int:"""设置哈希表字段"""try:return self.client.hset(name, mapping=mapping)except Exception as e:logging.error(f"HSET 操作失败: {e}")return 0def hget(self, name: str, key: str) -> Optional[str]:"""获取哈希表字段值"""try:return self.client.hget(name, key)except Exception as e:logging.error(f"HGET 操作失败: {e}")return Nonedef hgetall(self, name: str) -> Dict[str, str]:"""获取哈希表所有字段和值"""try:return self.client.hgetall(name)except Exception as e:logging.error(f"HGETALL 操作失败: {e}")return {}def hdel(self, name: str, *keys) -> int:"""删除哈希表的一个或多个字段"""try:return self.client.hdel(name, *keys)except Exception as e:logging.error(f"HDEL 操作失败: {e}")return 0# ==================== List 操作 ====================def lpush(self, name: str, *values) -> int:"""向列表左侧插入元素"""try:return self.client.lpush(name, *values)except Exception as e:logging.error(f"LPUSH 操作失败: {e}")return 0def rpush(self, name: str, *values) -> int:"""向列表右侧插入元素"""try:return self.client.rpush(name, *values)except Exception as e:logging.error(f"RPUSH 操作失败: {e}")return 0def lpop(self, name: str) -> Optional[str]:"""从列表左侧弹出元素"""try:return self.client.lpop(name)except Exception as e:logging.error(f"LPOP 操作失败: {e}")return Nonedef rpop(self, name: str) -> Optional[str]:"""从列表右侧弹出元素"""try:return self.client.rpop(name)except Exception as e:logging.error(f"RPOP 操作失败: {e}")return Nonedef lrange(self, name: str, start: int, end: int) -> List[str]:"""获取列表指定范围的元素"""try:return self.client.lrange(name, start, end)except Exception as e:logging.error(f"LRANGE 操作失败: {e}")return []# ==================== Set 操作 ====================def sadd(self, name: str, *values) -> int:"""向集合添加元素"""try:return self.client.sadd(name, *values)except Exception as e:logging.error(f"SADD 操作失败: {e}")return 0def smembers(self, name: str) -> set:"""获取集合所有成员"""try:return self.client.smembers(name)except Exception as e:logging.error(f"SMEMBERS 操作失败: {e}")return set()def sismember(self, name: str, value: str) -> bool:"""检查元素是否在集合中"""try:return self.client.sismember(name, value)except Exception as e:logging.error(f"SISMEMBER 操作失败: {e}")return Falsedef srem(self, name: str, *values) -> int:"""从集合中移除元素"""try:return self.client.srem(name, *values)except Exception as e:logging.error(f"SREM 操作失败: {e}")return 0# ==================== Sorted Set 操作 ====================def zadd(self, name: str, mapping: Dict[str, float]) -> int:"""向有序集合添加元素"""try:return self.client.zadd(name, mapping)except Exception as e:logging.error(f"ZADD 操作失败: {e}")return 0def zrange(self, name: str, start: int, end: int, desc: bool = False) -> List[str]:"""获取有序集合指定范围的元素"""try:return self.client.zrange(name, start, end, desc=desc)except Exception as e:logging.error(f"ZRANGE 操作失败: {e}")return []def zscore(self, name: str, value: str) -> Optional[float]:"""获取有序集合元素的分数"""try:return self.client.zscore(name, value)except Exception as e:logging.error(f"ZSCORE 操作失败: {e}")return None# ==================== 通用操作 ====================def keys(self, pattern: str = '*') -> List[str]:"""获取匹配模式的所有键"""try:return self.client.keys(pattern)except Exception as e:logging.error(f"KEYS 操作失败: {e}")return []def flushdb(self) -> bool:"""清空当前数据库"""try:return self.client.flushdb() is Trueexcept Exception as e:logging.error(f"FLUSHDB 操作失败: {e}")return Falsedef info(self) -> Dict[str, Any]:"""获取 Redis 服务器信息"""try:return self.client.info()except Exception as e:logging.error(f"INFO 操作失败: {e}")return {}def close(self):"""关闭连接"""if hasattr(self, 'client'):self.client.close()# ==================== 使用示例 ====================def example_usage():"""使用示例"""import ostry:redis_client = RedisUtils(host=RedisConfig.HOST,port=RedisConfig.PORT,password=RedisConfig.PASSWORD,db=RedisConfig.DB)if redis_client.ping():print("✅ Redis 连接成功")else:print("❌ Redis 连接失败")return# String 操作示例print("\n--- String 操作 ---")redis_client.set("test_key", "Hello Redis!", ex=3600)value = redis_client.get("test_key")print(f"GET test_key: {value}")# Hash 操作示例print("\n--- Hash 操作 ---")redis_client.hset("user:1", {"name": "Alice", "age": "25"})user_info = redis_client.hgetall("user:1")print(f"User info: {user_info}")# List 操作示例print("\n--- List 操作 ---")redis_client.lpush("message_queue", "msg1", "msg2")messages = redis_client.lrange("message_queue", 0, -1)print(f"Messages: {messages}")# Set 操作示例print("\n--- Set 操作 ---")redis_client.sadd("tags", "python", "redis", "database")tags = redis_client.smembers("tags")print(f"Tags: {tags}")# 关闭连接redis_client.close()except ConnectionError as e:print(f"❌ 连接错误: {e}")except Exception as e:print(f"❌ 其他错误: {e}")if __name__ == "__main__":# 配置日志logging.basicConfig(level=logging.INFO)example_usage()
本文内容仅供参考,不构成任何专业建议。使用本文提供的信息时,请自行判断并承担相应风险。
分享文章



