rediSQL使用记录

rediSQL使用记录

背景
最近我们组在做自动化数据可视化产品,一次配置,可自动适配实时、离线数据, 实时数据存储在Redis,当前端图表配置项功能越来越复杂时,基于Key的查询接口实现上越来越臃肿,越来越难以跟上前端功能的多样性。

比如前端要自动生成表格,假设数据有俩个维度(platform端,city省份),2个指标(pv浏览量,click_pv点击)

KEY的格式分别为:

platform_city_pv value
platform_city_click_pv value
第一天,PM要求表格展示4列明细数据就OK

实时接口实现步骤:

需要知道每个维度都有哪些枚举值,比如端:app/m/pc/other,省市有32个,这些枚举值我可以从维表获取
拼key,大约需要拼 2 * 4 * 32 = 256
第二天,PM提出新要求,要求按照某列排序

内存排序
第三天,PM要求加一列,显示点击率 ( click_pv/pv)

衍生指标计算,支持公式定义的指标
第四天,点击率要求保留2位小数

格式化处理
第五天,点击率要百分制

第六天,按照点击率倒序显示表格

第七天,表格要做分页,一页10行

第八天,表格需要过滤掉点击率小于5%的行数据

第九天,。。。。

第十天,卒。

你会发现接口再屌,也难以应对丰富的前端功能

领导知道了我们接口的苦衷,马上喊过去开会,讨论半天确实没有太好的方案,不过嘴里一直叨叨 ”有可能在Redis直接执行SQL呢”..

说者无意,听者有心。

google起来。查到了我们今天的主角:RediSQL

git地址: https://github.com/RedBeardLab/rediSQL

git真香.

介绍
rediSQL是Redis的SQLite数据库的嵌入模块,命令支持Redis几乎所有的类型和功能,支持SQL大部分语句, 使用也特别简单,下载,build后,启动redis时,加载编译后的so,就可以直接redis-cli命令行下执行SQL

安装
安装有两种方式

git clone & build * release
docker方式
由于rediSQL需要rust环境,比较繁琐,我直接使用了docker hub的可用image.

dalongrong/redisql (可用)
rediSQL开放的镜像远程服务网络有问题

1
2
3
4
5
6
7
8
docker pull dalongrong/redisql # ( 默认你有可用docker
docker run -p 127.0.0.1:6379:6379 -it dalongrong/redisql ( 暴露6379端口)
redis-cli -p 6379 ( 连接的就是镜像中启动的redis)
试用
创建数据库:REDISQL.CREATE_DB helloworld
创建表:REDISQL.EXEC helloworld "CREATE TABLE abc(A INT, B TEXT);"
查询:REDISQL.EXEC helloworld "select * from abc"
测试:SELECT position, dt, sum(req_pv) as req_pv FROM adm_rcmd_all_di GROUP BY dt,position having(sum(req_pv) > 432683005803) order by position,req_pv desc limit 100

SQL大部分功能都可以很好支持
SDK
想与我们的工程结合肯定需要有SDK的支持,不可能命令行操作。现在rediSQL支持三种语言

Python
Golang
Node.js
SDK实现方式都差不多,我们拿python来看一下使用方式。

获取连接(建议生产时使用连接池)

def conn():
    return redis.StrictRedis(host='localhost', port=6379, db=0)
执行操作命令
redis = conn() # 获取连接
redis.execute("REDISQL.CREATE_DB", "db_1") # 创建数据库
query = "CREATE TABLE IF NOT EXISTS table_1(id integer primary key, age int, name text)" 
redis.execute("REDISQL.EXEC", "db_1", query) # 创建表
q = f"INSERT INTO table_1 VALUES(1,21, 'zhangsan');"
self.redis.execute_command("REDISQL.EXEC", 'db_1', q)
性能测试
# -*- coding: utf-8 -*-
# Author: kaizhang01
import asyncio
import aioredis
import random
import redis
import time


def conn():
    loop = asyncio.get_event_loop()
    conn_co = aioredis.create_pool('redis://localhost', minsize=10, maxsize=300, loop=loop)
    redis_co = asyncio.gather(*[conn_co])
    redis = loop.run_until_complete(redis_co)
    redis = redis[0]
    return redis


def new_conn():
    # return redis.Redis(host='localhost', port=6379, db=1)
    # return redis.StrictRedis(connection_pool =
    #         redis.BlockingConnectionPool(
    #             max_connections = 15,
    #             timeout = 10))
    return redis.StrictRedis(host='localhost', port=6379, db=0)


class RedisSQL():
    """redis sql test"""

    def __init__(self):
        self.table_name = 'adm_rcmd_all_di'
        self.redis = new_conn()
        self.dts = [
            "2019-09-10", "2019-09-11",
            "2019-09-12", "2019-09-13",
            "2019-09-14", "2019-09-15",
            "2019-09-16", "2019-09-17", "2019-09-18",
        ]
        self.authors = [
            "lisi",
            "zhangsan",
            "wangqiang"
        ]
        self.position = [
            '0300', '0200'
        ]

    def db_init(self, run=False):
        if run:
            print(self.table_name)
            self.redis.execute("REDISQL.CREATE_DB", self.table_name)  # APP实时监测整体表
            query = f"CREATE TABLE IF NOT EXISTS {self.table_name}(id integer primary key, position text, dt text, req_pv int, sign_show_pv int, click_pv int);"
            print(query)
            self.redis.execute("REDISQL.EXEC", self.table_name, query)
            # statement = f"INSERT INTO {table_name} VALUES(1,'1111');"
            # print(statement)
            # self.redis.execute("REDISQL.EXEC", table_name, query)
        return self

    def main(self, run=False):
        """测试性能"""
        # 写入百万数据,测试查询
        if run:
            for i in range(1, 1200000):
                _p = self.position[random.randint(0, len(self.position) - 1)]
                _dt = self.dts[random.randint(0, len(self.dts) - 1)]
                req_pv = random.randint(5000000, 8000000)
                sign_show_pv = random.randint(3000000, 5000000)
                click_pv = random.randint(1000000, 2000000)
                row = f"{i}, '{_p}', '{_dt}', {req_pv}, {sign_show_pv}, {click_pv}"
                # print(row)
                q = f"INSERT INTO {self.table_name} VALUES({row});"
                self.redis.execute_command("REDISQL.EXEC", self.table_name, q)
        return self

    def select(self):
        # q1 = f"SELECT dt, position,sum(req_pv) FROM {self.table_name} GROUP BY position,dt"
        q1 = f"SELECT position, dt, sum(req_pv) as req_pv FROM {self.table_name} GROUP BY dt,position having(sum(req_pv) > 432683005803)  order by position,req_pv desc limit 100"
        print(q1, "\n", time.time())
        X = self.redis.execute_command("REDISQL.EXEC", self.table_name, q1)
        for x in X:
            print(x)
        print(time.time())


if __name__ == '__main__':
    RedisSQL().db_init(True).main(True).select()
Todo: 更新数据测试

结论
单线程导入百万数据应该在2分钟左右
查询百万:1s
复杂SQL: 1s
1
SELECT position, dt, sum(req_pv) as req_pv FROM adm_rcmd_all_di GROUP BY dt,position having(sum(req_pv) > 432683005803)  order by position,req_pv desc limit 100
对于实时数据一般只展示当天最新数据,所以在数据量有限的情况下,比如百万级,可以一试
查询rediSQL网上资料很少,wiki给的命令很多也是不靠谱,这也是我要记录下来的原因
没有在生产环境得到验证,选择使用可以,但前期一定要有灾备方案.
命裏有時終須有,命裏無時莫強求

end.

Recommended Posts