telegram中文搜索功能一直很差,但是telegram上面的内容搜索引擎却又无法索引,虽然已经有大佬做了一个专门的telegram引擎,但是只覆盖到部分群组,没有我想要的群组,为此抽空搞了下elk的方案去索引聊天记录

(仅用来学习交流,勿用于不符合法律法规操作)

telegram中文搜索

docker-elk

docker-compose启动

安装elk最快的方式自然是docker的方式,es官方提供了dockerfile,只需要使用docker-compose启动即可

1
2
3
git clone https://github.com/deviantony/docker-elk
cd docker-elk
docker-compose up -d

第一次下载镜像可能会慢点,安装完成后可以测试一下有没有安装成功了,浏览器访问kibana控制台即可http://127.0.0.1:5601。

重置密码

安装好后修改一下默认密码,免得出什么问题。对服务默认的账户进行默认密码重置:

1
docker-compose exec -T elasticsearch bin/elasticsearch-setup-passwords auto --batch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Changed password for user apm_system
PASSWORD apm_system = rcTzsAV

Changed password for user kibana_system
PASSWORD kibana_system = awWKyX2H

Changed password for user kibana
PASSWORD kibana = awWKyX2Hx

Changed password for user logstash_system
PASSWORD logstash_system = zXhnmei

Changed password for user beats_system
PASSWORD beats_system = UQtnmc2Fi

Changed password for user remote_monitoring_user
PASSWORD remote_monitoring_user = Z6E9AJR

Changed password for user elastic
PASSWORD elastic = gkKDmNLRl

(以上密码只是示例)

重置之后,接着修改下之前从git拉取下来的配置文件即可,主要涉及到三个文件

1
2
3
docker-elk/logstash/config/logstash.yml
docker-elk/logstash/pipeline/logstash.conf
docker-elk/kibana/config/kibana.yml

对应上图输出的命令输出修改账号和密码即可

  • docker-elk/kibana/config/kibana.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    ---
    server.name: kibana
    server.host: 0.0.0.0
    elasticsearch.hosts: [ "http://elasticsearch:9200" ]
    monitoring.ui.container.elasticsearch.enabled: true

    ## X-Pack security credentials
    #
    elasticsearch.username: kibana_system
    elasticsearch.password: awWKyX2H

    i18n.locale: "zh-CN"
  • docker-elk/logstash/config/logstash.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    ---
    http.host: "0.0.0.0"
    xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]

    ## X-Pack security credentials
    #
    xpack.monitoring.enabled: true
    xpack.monitoring.elasticsearch.username: logstash_system
    xpack.monitoring.elasticsearch.password: zXhnmei
  • docker-elk/logstash/pipeline/logstash.conf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    input {
    beats {
    port => 5044
    }

    tcp {
    port => 5000
    }
    }

    ## Add your filters / logstash plugins configuration here

    output {
    elasticsearch {
    hosts => "elasticsearch:9200"
    user => "elastic"
    password => "gkKDmNLRl"
    ecs_compatibility => disabled
    }
    }

接着输入docker-compose restart重启即可

登录kibana

地址:http://127.0.0.1:5601

账号:elastic

密码:gkKDmNLRl

telegram聊天记录入库

获取聊天记录

注册应用

在手机浏览器(电脑注册可能会出现error情况,尚未明确原因,可能和去广告插件或者代理有关)上面访问https://my.telegram.org/apps,注册一个应用,url不用填

获取到api_idapi_hash即可

api封装

telegtam提供了众多api并有自己的私有协议,自己实现起来略复杂,不然直接用封装好的即可,看了一圈,发现telethon比较完善:LonamiWebs/Telethon: Pure Python 3 MTProto API Telegram client library, for bots too!

说明文档里面提到的这几个包可以安装一下,主要用来提升代码运行效率

Installation — Telethon 1.23.0 documentation

登录

1
2
from telethon import TelegramClient
self.client = TelegramClient('anon', api_id, api_hash, proxy=("http", '127.0.0.1', 8118))

这里如果要使用代理访问api的话,需要安装下面教程安装下相关依赖

https://docs.telethon.dev/en/latest/basic/signing-in.html?highlight=proxy#signing-in-behind-a-proxy
第一个参数会在运行脚本的当前目录下生成anon.session文件,第一次登录后后面不需要登录了。

获取到频道的聊天记录

1
2
3
4
async def load_history_to_save(self):
entity = await self.client.get_entity(channel_id)
async for message in self.client.iter_messages(entity, reverse=True, offset_id=latest_id, limit=None):
print(message.text)

上面可以获取到某一个频道的所有聊天记录

入库es

Python操作es

使用官方的es操作封装库

elastic/elasticsearch-py: Official Elasticsearch client library for Python

1
2
from elasticsearch import Elasticsearch
self.es = Elasticsearch('localhost', http_auth=('elastic', 'gkKDmNLRl'))

保存到es

首先需要对上面获取到的message进行格式化,提取出想要的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from telethon.utils import get_display_name
async def format_message(self, message, chat_id):
if not message.text or message.media:
return None

chat = await message.get_chat()

chat_display_name = get_display_name(chat)
sender_user = await message.get_sender()
doc_data = {
"timestamp": message.date,
"sender": {
"username": getattr(sender_user, 'username', ''),
"firstName": getattr(sender_user, 'first_name', ''),
"lastName": getattr(sender_user, 'last_name', '')
},
"channel": chat_display_name,
"channel_id": chat_id,
"text": self.converter.convert(message.text),
"message_id": message.id
}
return doc_data

然后保存到es即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from elasticsearch import Elasticsearch
from loguru import logger

# key:channel_id, value:已保持到es的最新的id
self.channel_dict = {
-1001094615131: 0, # archlinux-cn-offtopic
}

async def save_to_es(self, doc_data, message_id, channel_id):
if not doc_data:
return

self.es.index(index='telegram', document=doc_data)
# 保存最新id到文本中
if self.channel_dict[channel_id] < message_id:
self.channel_dict[channel_id] = message_id
FileDict.save_obj(self.channel_dict, 'latest_id')
logger.info('成功存储:{}-{}-{}', doc_data["channel"], message_id, doc_data["timestamp"])

这里将已保存到es的消息的最新id保存到文本中,每次保存一条记录就更新id,用于保存已经处理的位置

使用自带的pickledict保存到文本中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import os
import pickle


class FileDict:

@staticmethod
def get_file_path(name):
return os.path.join('obj', name + '.pkl')

@staticmethod
def save_obj(obj, name):
if not os.path.exists('obj'):
os.makedirs('obj')
with open(FileDict.get_file_path(name), 'wb') as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)

@staticmethod
def load_obj(name):
with open(FileDict.get_file_path(name), 'rb') as f:
return pickle.load(f)

所以,在脚本刚启动的时候还需要初始化一下最新的id

1
2
3
4
5
6
7
8
9
10
from loguru import logger
def init_latest_channel_message_id(self):
"""从文本中读取到最新保存到es的id并更新到变量"""
try:
latest_dict = FileDict.load_obj('latest_id')
for channel_id, _ in self.channel_dict.items():
if channel_id in latest_dict:
self.channel_dict[channel_id] = latest_dict[channel_id]
except FileNotFoundError as e:
logger.error(e)

启动

完善代码拉取数据和保存到es代码:

1
2
3
4
5
6
7
8
async def load_history_to_save(self):
"""从api拉取数据并保存到es"""
for channel_id, latest_id in self.channel_dict.items():
entity = await self.client.get_entity(channel_id)
async for message in self.client.iter_messages(entity,
reverse=True, offset_id=latest_id, limit=None):
doc_data = await self.format_message(message, channel_id)
await self.save_to_es(doc_data, message.id, channel_id)

启动脚本运行

1
2
3
def start(self):
with self.client:
self.client.loop.run_until_complete(self.load_history_to_save())

效果