前言

因为业务调整,在开发工作的基础上,我同时还要充当公司云平台的客服。

作为客服,在提供技术支持的同时,每周和每月我还要对客服系统的相关数据进行统计汇报。

我们使用的是udesk的客服系统软件,支持手动导出数据和出一些简单的统计图,但是感觉不够自动化和定制化。

正好发现udesk官方提供了api,于是决定通过python使用api抓取udesk客服系统数据并同步到本地mysql数据库,保存到本地后也方便进一步对数据进行分析,绘制图表等等。

数据库接口

首先写个数据库接口,使用了 sqlalchemy 连接 mysql ,并使用 pandas 直接读写数据表。

mysqlconn.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import re
import logging
import sqlalchemy as sql
import pandas as pd


class mysqlconn:
'''数据接口类'''
@staticmethod
def map_dtype(df):
'''
这里定义了 pandas 数据类型到 sqlalchemy 数据类型的默认映射转化。
sqlalchemy 数据类型与 mysql 数据类型的对应关系如下:
types.Integer: 'INT',
types.BigInteger: 'BIGINT',
types.Float: 'FLOAT',
types.Numeric: 'DECIMAL',
types.Boolean: 'TINYINT',
types.String: 'VARCHAR',
types.Text: 'TEXT',
types.Unicode: 'VARCHAR',
types.UnicodeText: 'MEDIUMTEXT',
types.Date: 'DATE',
types.DateTime: 'DATETIME',
types.Time: 'TIME',
types.Binary: 'BLOB',
types.LargeBinary: 'LONGBLOB'
'''
dtypedict = {}
for i, j in zip(df.columns, df.dtypes):
if "float" in str(j):
dtypedict.update({i: sql.FLOAT(precision=2, asdecimal=True)})
elif "int" in str(j):
dtypedict.update({i: sql.INTEGER()})
elif "datetime" in str(j):
dtypedict.update({i: sql.DATETIME()})
elif "bool" in str(j):
dtypedict.update({i: sql.BOOLEAN()})
elif "list" in i.lower() or "content" in i.lower():
dtypedict.update({i: sql.TEXT()})
else:
dtypedict.update({i: sql.String(50)})
return dtypedict

@staticmethod
def map_mysql(df):
'''
这里定义了 pandas 数据类型到 mysql 数据类型的默认映射转化
'''
dtypedict = {}
for i, j in zip(df.columns, df.dtypes):
if "float" in str(j):
dtypedict.update({i: "DECIMAL(10, 2)"})
elif "int" in str(j):
dtypedict.update({i: "INT"})
elif "datetime" in str(j):
dtypedict.update({i: "DATETIME"})
elif "bool" in str(j):
dtypedict.update({i: "TINYINT"})
elif "list" in i.lower() or "content" in i.lower():
dtypedict.update({i: "TEXT"})
else:
dtypedict.update({i: "VARCHAR(50)"})
return dtypedict

@staticmethod
def insert_data(table, conn, keys, data_iter):
'''
自定义插入数据处理方法,用于 pd.to_sql 函数中。
这里主要自定义的功能是当插入重复数据时忽略跳过,原本会报错。
'''
# "a" is the primary key in "conflict_table"
data = [dict(zip(keys, row)) for row in data_iter]
stmt = sql.insert(table.table).values(data).prefix_with("IGNORE")
result = conn.execute(stmt)
return result.rowcount

def __init__(self, logger=None):
mysql_username = 'mysql_username'
mysql_password = 'mysql_password'
mysql_host = 'mysql_host'
port = 3306
mysql_db = 'mysql_db'
# 初始化数据库连接,内部使用 pymysql 库
self.engine = sql.create_engine(
'mysql+pymysql://{}:{}@{}:{}/{}'.format(mysql_username,
mysql_password,
mysql_host, port,
mysql_db),
pool_size=10,
pool_recycle=3600)
self.con = self.engine.connect()
self.retry = 0
self.missing_column = ""
if logger is None:
self.logger = logging.getLogger()
else:
self.logger = logger

def __del__(self):
self.con.close()

def table_exists(self, name):
'''
判断数据表是否存在。
'''
ins = sql.inspect(self.engine)
ret = ins.dialect.has_table(self.con, name)
if not ret:
self.logger.warn(f'Table "{name}" not exists.')
return ret

def new(self, name, df, key, map_dtype=None):
'''
新建数据表,会删除已有的同名数据表
'''
self.con.execute(sql.text(f"DROP TABLE IF EXISTS `{name}`"))
dtypedict = self.map_dtype(df)
if map_dtype is not None:
# 这里可以自定义保存的数据类型
dtypedict.update(map_dtype)
columns = [sql.Column(key, dtypedict[key],
primary_key=True, autoincrement=False)]
columns += [sql.Column(col, dtypedict[col]) for col in df.columns
if col != key]
metadata = sql.MetaData()
mytable = sql.Table(name, metadata, *columns)
# 这里的 bind 之前是使用 sqlalchemy.Engine 对象,发现在控制台单步运行时没问题,但是在脚本中运行会卡死,
# 换成 sqlalchemy.Engine.connect() 之后就好了。
metadata.create_all(bind=self.con, tables=[mytable], checkfirst=True)
self.to_sql(name, df, if_exists='append')

def execute(self, sql):
'''执行 mysql 语句'''
self.con.execute(sql)

def to_sql(self, table, df: pd.DataFrame, if_exists='append'):
'''
写入数据表。
if_exists:有三个值 fail replace append
1. fail:如果表存在,啥也不做
2. replace:如果表存在,删了表,再建立一个新表,把数据插入
3. append:如果表存在,把数据插入,如果表不存在创建一个表!!
'''
try:
# 这里要使用 sqlalchemy.Engine ,使用engine.connect()会导致插入新表失败,
# 报(pymysql.err.IntegrityError) (1062, "Duplicate entry错误
# index 是否储存index列
df.to_sql(table, con=self.engine, if_exists=if_exists,
index=False, method=self.insert_data)
except sql.exc.OperationalError as e:
# 处理由于缺少列而导致的异常(例如:Unknown column 'surveyOptionId' in 'field list')
# 用于追加新列
if "Unknown column" in str(e):
pattern = r'Unknown column \'([^\'"]+)\' in \'field list\''
match = re.search(pattern, str(e))
# 这里如何添加新列后如果重复报错,则在重复 2 次后抛出错误。
if match and self.retry < 2:
missing_column = match.group(1)
dtypedict = self.map_mysql(df)
print(f"missing column '{missing_column}' in '{table}'")
ddl = f"\
ALTER TABLE {table} ADD COLUMN `{missing_column}` {dtypedict[missing_column]}"
# 在数据表中添加缺失的列
self.con.execute(sql.text(ddl))
if self.missing_column == missing_column:
self.retry += 1
else:
self.missing_column = missing_column
self.retry = 0
self.to_sql(table, df, if_exists)
self.retry = 0
else:
raise e
else:
raise e

def read_sql(self, sql):
'''读取整张数据表'''
return pd.read_sql(sql, con=self.con)

udesk接口

  1. 写一些常用静态函数

basef.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import json
import numpy as np


class basef:
'''基础类,定义一些常用静态函数'''
@staticmethod
def js_decode(o):
'''用于 json.dumps 函数中,处理数据转化问题'''
if isinstance(o, np.int64):
return int(o)

@staticmethod
def update(dic: dict, key, value):
'''更新字典,如果数据为 None 则不更新'''
if value is not None:
dic.update({key: value})
return dic

@staticmethod
def df2str(df):
'''使用 json.dumps 方式把 pandas 中的对象类型数据转化为字符类型'''
def tmp(x):
if isinstance(x, list) or isinstance(x, dict):
return json.dumps(x, default=basef.js_decode,
ensure_ascii=False)
else:
return x
for v in df.columns:
if str(df[v].dtype) == "object":
df[v] = df[v].apply(lambda x: tmp(x))
return df
  1. 基础接口,定义了验证方法,和基础的爬取方法

KMAuth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import json
import time
import hashlib
import logging
import requests
from basef import basef

class KMAuth(basef):
'''基础接口,定义了验证方法,和基础的爬取方法'''

def __init__(self, logger=None) -> None:
self.kmhost = "https://km.s4.udesk.cn/api"
self.email = "email"
self.km_api_token = "km_api_token"
self.timestamp = 0
self.wait = 1
if logger is None:
self.logger = logging.getLogger()
else:
self.logger = logger

def kmauth(self):
'''生成验证数据'''
email = self.email
km_api_token = self.km_api_token
timestamp = int(time.time())
sign = hashlib.sha1(f"{email}&{km_api_token}&{timestamp}".encode())
return {
"email": email,
"timestamp": timestamp,
"sign": sign.hexdigest()
}

def url(self, url, params, wait, rep=0, method="get", data=None):
'''
爬取功能,method 支持 get post put delete
'''
detime = int(time.time()) - self.timestamp
if detime < wait:
time.sleep(wait - detime)
self.timestamp = int(time.time())
if method == "get":
res = requests.get(url=url, params=params)
elif method == "post":
headers = {"Content-Type": "application/json"}
res = requests.post(url, data=json.dumps(
data, default=self.js_decode), params=params, headers=headers)
elif method == "put":
res = requests.put(url=url, data=params)
elif method == "delete":
res = requests.delete(url=url, params=params)
dic = json.loads(res.content.decode())
rep += 1
if dic.get("code") == 200:
return dic
elif rep < 2:
self.logger.warn(dic)
time.sleep(wait)
self.url(url, params, wait, rep, method, data)
else:
# 两次重复爬取失败则报错
raise ValueError(dic)
  1. udesk机器人接口类

KMRobot.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
import os
import pandas as pd
import sqlalchemy as sql
from mysqlconn import mysqlconn
from KMAuth import KMAuth

class KMRobot(KMAuth, mysqlconn):
'''udesk机器人接口类'''
# Default_condition 获取会话的默认条件
Default_condition = {
"field": "customerMsgCount",
"operator": "greater_than",
"value": 0
}
# channelList 对于同一个机器人一般是有一个固定值
# 可通过 self.getrobots() 获取
channelList = [{'robotId': 6901,
'name': '系统默认场景',
'type': 1,
'id': 8552,
'deleteFlag': 0}]

@staticmethod
def auto_dtype(df: pd.DataFrame):
'''把pandas中列名带有time的的列自动转化为TimestampSeries数据类型'''
for v in df.columns:
if "time" in v.lower():
df[v] = pd.to_datetime(df[v])
return df

def __init__(self, logger=None) -> None:
mysqlconn.__init__(self)
KMAuth.__init__(self, logger)
self.wait = 2
self.pageNum = 1
# 可使用 self.getapp() 获取 appID
self.appID = "appID"
# 可使用 self.getrobots() 获取 robotId ,注意要先获取 appID
self.robotId = 6901

def set_robot(self, robotId):
self.robotId = robotId

def set_app(self, appID):
self.appID = appID

def getapp(self, page=1, page_size=30):
'''获取账户下的应用信息'''
api = "v1/apps"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
params = self.update(params, "pageNum", page)
params = self.update(params, "pageSize", page_size)
return self.url(new_url, params, wait=self.wait, method="get")

def getrobots(self):
'''获取账户某个应用下面的机器人信息'''
api = "v1/robots"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
params = self.update(params, "appID", self.appID)
return self.url(new_url, params, wait=self.wait, method="get")

def getlabels(self):
'''这个接口好像没有用'''
api = "v1/questionLabels"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
params = self.update(params, "robotId", self.robotId)
return self.url(new_url, params, wait=self.wait, method="get")

def getctg(self):
'''获取问题分类'''
api = "v1/categories"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
params = self.update(params, "robotId", self.robotId)
content = self.url(new_url, params, wait=self.wait, method="get")
res = pd.DataFrame(content["data"])
return self.auto_dtype(res)

def ctg2mysql(self):
'''问题分类同步到数据库'''
df = self.getctg()
df = self.df2str(df)
self.new("categories", df, key="id")

def getactg(self, id):
'''获取单个问题分类,感觉没什么用'''
api = f"v1/categories/{id}"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
params = self.update(params, "robotId", self.robotId)
return self.url(new_url, params, wait=self.wait, method="get")

def postctg(self, name, parentId=81765):
'''添加新的问题分类'''
api = "v1/categories"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
data = {}
data = self.update(data, "robotId", self.robotId)
data = self.update(data, "name", name)
data = self.update(data, "parentId", parentId)
data = self.update(data, "classify", 1) # 1.知识库分类 2.寒暄库分类.
content = self.url(new_url, params, data=data,
wait=self.wait, method="post")
return content["data"]["id"]

def delactg(self, id):
'''删除某个问题分类'''
api = f"v1/categories/{id}"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
params = self.update(params, "robotId", self.robotId)
self.url(new_url, params, wait=self.wait, method="delete")

def updatectg(self, name, parentId=81765):
'''更新问题分类,数据库中没有才更新'''
query = sql.text("SELECT id FROM categories WHERE name = :name")
res = self.execute(query, [{"name": name}]).fetchall()
if res == []:
id = self.postctg(name, parentId)
self.ctg2mysql()
return id
else:
return res[0][0]

def addfaq(self, category, content, answer,
keyword=None, suggestsList=None):
'''添加问题,没有才添加'''
query = sql.text("SELECT id FROM questions WHERE content = :content")
res = self.execute(query, [{"content": content}]).fetchall()
if res != []:
return res[0][0]
api = "v1/questions"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
if isinstance(category, list):
categoryId = 81765
for v in category:
categoryId = self.updatectg(v, categoryId)
else:
categoryId = self.updatectg(category)
if isinstance(suggestsList, str):
suggestsList = suggestsList.split(",")
data = {}
data = self.update(data, "robotId", self.robotId)
data = self.update(data, "categoryId", categoryId)
# data = self.update(data, "labelIds", labelIds)
data = self.update(data, "content", content)
data = self.update(data, "type", 1)
data = self.update(data, "answer", {"content": answer, "type": 2})
data = self.update(data, "channelList", self.channelList)
data = self.update(data, "suggestType", 1)
data = self.update(data, "keyword", keyword)
data = self.update(data, "suggestsList", suggestsList)
content = self.url(new_url, params, data=data,
wait=self.wait, method="post")
if content["message"].lower() == "ok":
return content["data"]["id"]
else:
raise ValueError(content["message"])

def faqlist(self, robotId=6901):
'''获取问题列表'''
api = "v1/questions/search"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
data = {}
pageNum = self.pageNum
pageSize = 100
data = self.update(data, "robotId", robotId)
data = self.update(data, "pageNum", pageNum)
data = self.update(data, "pageSize", pageSize)
content = self.url(new_url, params, data=data,
wait=self.wait, method="post")
total = content["paging"]["total"]
res = pd.DataFrame(content["data"])
res = self.auto_dtype(res)
rest = total - pageNum * pageSize
if rest > 0:
self.pageNum += 1
res2 = self.faqlist(robotId)
res = pd.concat([res, res2])
else:
self.pageNum = 1
return res

def faqlist2mysql(self, robotId=6901):
'''问题列表同步到数据库'''
df = self.faqlist(robotId)
df.insert(15, 'answer_id',
df["answer"].apply(lambda x: x.get("id") if
isinstance(x, dict) else None))
df["answer"] = df["answer"].apply(
lambda x: x.get("content") if isinstance(x, dict) else None)

def tmp(x):
return ",".join(x) if isinstance(x, list) else x
df["suggestList"] = df["suggestList"].apply(
lambda x: tmp(x))
df = self.df2str(df)
ctg = self.read_sql("categories")
ctg.index = ctg["id"]
df.insert(4, 'category',
df["categoryId"].apply(lambda x: ctg.loc[x, "name"]))
df = self.auto_dtype(df)
self.new("questions", df, key="id",
map_dtype={"answer": sql.TEXT(), "wechatUrl": sql.TEXT()})

def sessions(self, start_time=None, end_time=None,
keywords=None, keywordsSource=None,
conditionList=[Default_condition], robotId=6901):
'''获取会话记录'''
api = "v1/sessions"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
data = {}
pageNum = self.pageNum
pageSize = 100
data = self.update(data, "robotId", robotId)
data = self.update(data, "startTime", start_time)
data = self.update(data, "endTime", end_time)
data = self.update(data, "pageNum", pageNum)
data = self.update(data, "pageSize", pageSize)
data = self.update(data, "keywords", keywords)
data = self.update(data, "keywordsSource", keywordsSource)
data = self.update(data, "conditionList", conditionList)
content = self.url(new_url, params, wait=self.wait,
method="post", data=data)
total = content["paging"]["total"]
res = pd.DataFrame(content["data"])
res = self.auto_dtype(res)
rest = total - pageNum * pageSize
if rest > 0:
self.pageNum += 1
res2 = self.sessions(start_time, end_time, keywords,
keywordsSource, conditionList, robotId)
res = pd.concat([res, res2])
else:
self.pageNum = 1
return res

def sessions2mysql(self, start_time=None, end_time=None,
save_log=False, robotId=6901):
'''
获取会话记录同步到数据库
save_log 可以选择同步回话的聊天记录
'''
df = self.sessions(start_time, end_time, robotId=robotId)
df.insert(19, 'customer_id',
df.apply(lambda x: x.loc["customer"]["id"], axis=1))
df["customer"] = df.apply(lambda x: x.loc["customer"].get("email"),
axis=1)
df.drop('im', axis=1, inplace=True)
df = self.df2str(df)
if not self.table_exists("sessions"):
self.new("sessions", df, key="id")
else:
self.to_sql("sessions", df, if_exists="append")
if save_log:
if self.table_exists("chatlog"):
[self.updatechatlog(x) for x in df["id"]]
else:
df2 = self.chatlog(df["id"].iloc[0])
df2 = self.df2str(df2)
self.new("chatlog", df2, key="iid")
[self.updatechatlog(x) for x in df["id"]]

def chatlog(self, session_id):
'''获取聊天记录'''
api = f"v1/sessions/{session_id}/logs"
new_url = os.path.join(self.kmhost, api)
params = self.kmauth()
pageNum = self.pageNum
pageSize = 100
# params = self.update(params, "robotId", robotId)
params = self.update(params, "pageNum", pageNum)
params = self.update(params, "pageSize", pageSize)
content = self.url(new_url, params, wait=self.wait, method="get")
total = content["paging"]["total"]
res = pd.DataFrame(content["data"])
res.insert(0, "iid",
res["id"].apply(lambda x: str(session_id)+"-"+str(x)))
res = self.auto_dtype(res)
rest = total - pageNum * pageSize
if rest > 0:
self.pageNum += 1
res2 = self.chatlog(session_id)
res = pd.concat([res, res2])
else:
self.pageNum = 1
return res

def chatlog2mysql(self, session_id):
'''聊天记录同步到数据库'''
df = self.chatlog(session_id)
df = self.df2str(df)
if not self.table_exists("chatlog"):
self.new("chatlog", df, key="iid")
else:
self.to_sql("chatlog", df, if_exists="append")

def updatechatlog(self, session_id):
'''当当前会话的聊天记录没有同步到数据库时,才同步聊天记录到数据库'''
query = sql.text("SELECT id FROM chatlog WHERE sessionId = :content")
res = self.execute(query, [{"content": session_id}]).fetchall()
if res == []:
self.chatlog2mysql(session_id)

使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from KMRobot import KMRobot


robot = KMRobot()
# 同步问题分类
robot.ctg2mysql()
# 同步问题列表,注意需要先同步问题分类
robot.faqlist2mysql()
# 同步 2024-05-01 到 2024-05-05 的会话记录到数据库,并保存关联的聊天记录到数据库。
start_time="2024-05-01 00:00:00"
end_time="2024-05-05 00:00:00"
robot.sessions2mysql(start_time, end_time, save_log=True)
# 添加问题分类
robot.postctg("测试")