工作之外,总想折腾点什么东西。最近看朋友搞的QQ摸鱼机器人有点意思,就想自己也搞一个。tg提供的API比较全面,所以就想搞个tg版。
直接贴代码:https://github.com/rfkhx/starsea
基本消息处理和分发
其实仅就这个需求而言,并没有用多线程的必要性,一个死循环中拉取消息,拉到后交给对应模块处理,处理完继续下一次循环拉下一条消息就可以了。不过为了把代码结构写得更清楚一点,并希望能在这个过程中学到点东西,我决定仿照安卓的线程机制组织,将安卓中的 Handler
、Looper
移植到我的项目中,借助这个过程更深入理解整个机制。
类似安卓,我在后台线程进行网络请求,主线程进行结果的分发处理。
Message
Message
就是消息对象。我对消息对象的定义如下:
data class Message(
val what: Int,
val arg: Int = -1,
val payload: Any? = null,
var time: Long = 0L,
var target: Handler? = null,
val callback: Runnable? = null
)
要理解整个消息机制,还是得先看这几个参数。
- what 表示这是一条什么样的消息。一个
Handler
中可能定义和处理多种不同的消息,通过这个参数进行区分。 - arg、payload 表示消息携带的参数。
arg
是简单的,Int
类型的参数,而payload
则用于传递复杂对象。按需使用 - time 表示
Message
预期被处理的时间。只有到了这个时间之后消息才可以被取出处理。这对应我们经常用的handler.postDelayed
方法 - target 处理消息的
Handler
- callback 消息处理时执行特定回调。实际上和继承相关
Handler
并在相关方法中处理消息是等同的
MessageQueue
记得安卓中有个比较经典和基础的问题,主线程实际执行了 Looper.loop()
,为什么不会卡死?
首先我们看 Looper
的实现。
确认这里确实是个死循环,继续看单次循环 loopOnce
方法做了什么。
首先从 MessageQueue
中取到一条消息,一些判断后实际走了 Message
对应 Handler
来处理消息。似乎看不到哪里处理了这个问题,但我们可以在取消息处看到一条注释“might block”。
是的,这个队列有个特性,就是没有可用消息时,线程会阻塞在这里,就像我们用 Scanner
读用户输入一样,只有有了输入程序才会继续运行,所以不会占满CPU资源。而用户感受到的卡顿发生实际上是主线程堆积的消息过多,无法及时处理导致的,与这里用一个死循环处理消息没有直接关系。这里更多拓展探讨可以参考 每日一问 Looper.loop为什么不会阻塞掉UI线程?
幸运的是,java已经有具有这样功能的队列实现了,它就是 java.util.concurrent.DelayQueue
。只要让我们的 Message
类实现 Delayed
接口
data class Message(
val what: Int,
val arg: Int = -1,
val payload: Any? = null,
var time: Long = 0L,
var target: Handler? = null,
val callback: Runnable? = null
): Delayed {
override fun compareTo(other: Delayed?): Int {
val value1 = this.getDelay(TimeUnit.MILLISECONDS)
val value2 = (other?.getDelay(TimeUnit.MILLISECONDS)?: 0L)
return (value1 - value2).getSymbolInt()
}
override fun getDelay(unit: TimeUnit): Long {
val diffTime = time - System.currentTimeMillis()
return unit.convert(diffTime, TimeUnit.MILLISECONDS)
}
/*
* 安全地提取符号,将long转换成 -1 0 1 */ private fun Long.getSymbolInt(): Int {
val value = this
if (value == 0L) {
return 0
}
return (value / abs(value)).toInt()
}
}
然后我们就可以直接用这个 DelayQueue
作为我们的 MessageQueue
了。
class MessageQueue {
private val mQueue = DelayQueue<Message>()
fun enqueue(msg: Message) {
msg.time = 0L
mQueue.put(msg)
}
fun enqueue(msg: Message, time: Long) {
msg.time = time
mQueue.put(msg)
}
fun poll(): Message {
return mQueue.take()
}
fun clearMessages(handler: Handler) {
mQueue.removeIf { it.target == handler }
}
}
Looper
Looper
的核心代码抽离出来比较简单,我就直接贴出来了。 Looper
中持有 MessageQueue
,是整个消息循环的核心。
class Looper {
val mQueue = MessageQueue()
companion object {
private val sThreadLocal = ThreadLocal<Looper>()
private var sMainLooper: Looper? = null
fun myLooper() = sThreadLocal.get()
fun mainLooper() = sMainLooper!!
fun prepare() {
if (sThreadLocal.get() != null) {
throw IllegalStateException("looper prepared")
}
sThreadLocal.set(Looper())
}
fun prepareMainLooper() {
prepare()
synchronized(Looper::class) {
if (sMainLooper != null) {
throw IllegalStateException("main looper prepared")
}
sMainLooper = myLooper()
}
}
fun loop() {
val me = myLooper() ?: throw IllegalStateException("you should call prepare first")
while (true) {
val message = me.mQueue.poll()
message.target?.dispatchMessage(message)
}
}
}
}
这里唯一一个知识点就是 ThreadLocal
的使用。这里借用 廖雪峰的教程 中的一句话:“实际上,可以把ThreadLocal
看成一个全局Map<Thread, Object>
:每个线程获取ThreadLocal
变量时,总是使用Thread
自身作为key”。
每个线程 prepare
后都绑定了一个 Looper
,这个 Looper
就是放在 ThreadLocal
中的,此后我们在某一线程执行 sThreadLocal.get()
拿到的就是这个线程的 Looper
。
Handler
终于到了激动人心的时刻了,我们理到了最常使用和打交道的 Handler
。核心逻辑实际上要比我想象的简单。
open class Handler(looper: Looper? = null) {
companion object {
private val EMPTY_MESSAGE = Message(-1)
}
private var mLooper: Looper
private var mQueue: MessageQueue
private var mCallback: ((Message) -> Unit)? = null
init {
mLooper = looper ?: Looper.myLooper() ?: throw IllegalStateException("looper not prepared")
mQueue = mLooper.mQueue
}
fun sendMessage(msg: Message) = sendMessageDelay(msg, 0L)
fun sendMessageDelay(msg: Message, delay: Long) {
require(delay >= 0L) { "delay cannot be negative" }
msg.target = this
mQueue.enqueue(msg, System.currentTimeMillis() + delay)
}
fun sendEmptyMessage() = sendMessage(EMPTY_MESSAGE)
fun sendEmptyMessageDelayed(delay: Long) = sendMessageDelay(EMPTY_MESSAGE, delay)
fun post(block: () -> Unit) {
sendMessage(Message(0, callback = block))
}
fun postDelayed(block: () -> Unit, delay: Long) {
sendMessageDelay(Message(0, callback = block), delay)
}
fun dispatchMessage(message: Message) {
message.callback?.run() ?: mCallback?.invoke(message) ?: handleMessage(message)
}
fun clearMessages() {
mQueue.clearMessages(this)
}
protected open fun handleMessage(message: Message) {}
}
一个 Handler
对应一个 Looper
,sendMessage
实际上就是把对应的 Message
插入到了对应的 MessageQueue
中。而后阻塞在 Looper.loop
取下一条消息位置的线程被唤醒,并根据 Message.target
找到了发送消息的 Handler.dispatchMessage
来执行,而后实际执行对应callback或handleMessage方法。
Kotlin协程兼容
根据我们前面的设计,我们有主线程和后台线程之分。Kotlin协程的实现上, Dispatchers.IO
实际上是个未设置上限的线程池,但 Dispatchers.Main
可是需要切换回主线程来执行对应代码的。我们怎么做这里的兼容,使之能切换回我们定义的“主线程”呢?
继续看代码,通过SPI加载了MainDispatcherFactory。
继续找,很快找到了安卓对应的实现
其实就是将对应的代码post到了主线程去处理。所以仿照它,我的“青春版”实现也有了
@OptIn(InternalCoroutinesApi::class)
@AutoService(MainDispatcherFactory::class)
class MainDispatcherFactoryImpl: MainDispatcherFactory {
override val loadPriority: Int
get() = 0
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
return MainDispatcher()
}
}
class MainDispatcher: MainCoroutineDispatcher() {
private val handler = Handler(Looper.mainLooper())
override val immediate: MainCoroutineDispatcher
get() = this
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.sendMessage(Message(0, callback = block))
}
}
消息的拉取
长轮询的实现
tg的消息都是由一个叫做 getUpdate
的接口拉取的,而通过这个接口获取消息要做长轮询。
长轮询是什么?与普通的轮询方式不同,长轮询在发出请求后如果没有消息返回,服务端将不会立即返回,而是等有消息或超时再返回,而后客户端发起下一个请求。这一是能在有消息到达时让客户端及时收到通知,二是在没有消息时不会浪费太多无用请求。
我的网络请求部分是 Retrofit
,我发现单独给某个请求设置不同的超时时间还挺麻烦的。我参考 这里的讨论 总结如下:
首先将 OkHttp
的超时时间设置为一个很大的值,保证不会影响到我们后面的设置。
private val okHttpClient: OkHttpClient by lazy {
OkHttpClient.Builder()
.connectTimeout(1L, TimeUnit.DAYS) // 超时相关逻辑由 [top.ntutn.starsea.network.TimeoutCallAdapterFactory] 控制
.readTimeout(1L, TimeUnit.DAYS)
.writeTimeout(1L, TimeUnit.DAYS)
.build()
}
声明一个注解,用于指定单个连接的超时时间。
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER)
annotation class Timeout(val value: Long, val unit: TimeUnit)
创建 Retrofit
对象时,添加一个 CallAdapterFactory
,读取注解设置超时时间。
val retrofit: Retrofit by lazy {
Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(KotlinSerializeUtil.json.asConverterFactory(MediaType.parse("application/json")!!))
.addCallAdapterFactory(TimeoutCallAdapterFactory())
.client(okHttpClient)
.build()
}
class TimeoutCallAdapterFactory : CallAdapter.Factory(), LoggerOwner by slf4jLoggerOwner<TimeoutCallAdapterFactory>() {
override fun get(returnType: Type, annotations: Array<out Annotation>, retrofit: Retrofit): CallAdapter<*, *>? {
val timeout = annotations.firstOrNull { it is Timeout } as? Timeout
val delegate = retrofit.nextCallAdapter(this, returnType, annotations)
if (getRawType(returnType) != Call::class.java) {
return null
}
return object : CallAdapter<Any, Call<Any>> {
override fun responseType(): Type {
return delegate.responseType()
}
override fun adapt(call: Call<Any>): Call<Any> {
val path = call.request().url().url().path
if (timeout != null) {
logger.debug("请求{}的超时时间被设置为{} {}", path, timeout.value, timeout.unit)
call.timeout().timeout(timeout.value, timeout.unit)
} else {
logger.debug("请求{}的超时时间被设置为默认值{} s", path, RetrofitManager.DEFAULT_TIMEOUT.toLong())
call.timeout().timeout(RetrofitManager.DEFAULT_TIMEOUT.toLong(), TimeUnit.SECONDS)
}
return call
}
}
}
}
然后就可以借助注解便捷地指定某单个请求的超时时间了。
/**
* [Telegram Bot API](https://core.telegram.org/bots/api) */interface BotApi {
@GET("/bot{token}/getMe")
suspend fun getMe(@Path("token") token: BotToken): ResultWrapperBean<UserBean>
/**
* 获取更新消息,长连接
*/
@Headers(
"${RetrofitManager.READ_TIMEOUT}: 6000",
"${RetrofitManager.WRITE_TIMEOUT}: 6000",
"${RetrofitManager.CONNECT_TIMEOUT}: 6000"
)
@Timeout(value = 60L, unit = TimeUnit.SECONDS)
@GET("/bot{token}/getUpdates?timeout=30&allowed_updates=message,edited_message") // timeout单位秒
suspend fun getUpdates(@Path("token") token: BotToken, @Query("offset") offset: Long? = null): ResultWrapperBean<List<UpdateBean>>
}
消息循环实现
前面我们已经实现了安卓 Handler
机制的简单移植和协程的支持,这里就写个 Handler
来实现吧。
当开始获取消息时发送了一条 Message
,而后就开始执行单条消息的获取和处理了。注意不要捕获 CancellationException
时重试,否则job取消时就死循环了。
消息发送
简单消息的发送就不说了,唯一麻烦点的是用tg发送文件。接口定义:
interface BotApi {
/**
* Use this method to send photos. On success, the sent Message is returned. * @param chatId Unique identifier for the target chat or username of the target channel (in the format @channelusername)
* @param photo Photo to send. Pass a file_id as String to send a photo that exists on the Telegram servers (recommended), pass an HTTP URL as a String for Telegram to get a photo from the Internet. The photo must be at most 10 MB in size. The photo's width and height must not exceed 10000 in total. Width and height ratio must be at most 20. More information on Sending Files »
*/@POST("/bot{token}/sendPhoto")
@Multipart
suspend fun sendPhoto(@Path("token") token: BotToken, @Part("chat_id") chatId: RequestBody, @Part photo: MultipartBody.Part): ResultWrapperBean<MessageBean>
}
使用
fun String.toRequestBody(): RequestBody {
return RequestBody.create(MediaType.parse("text/plain"), this)
}
fun File.toMultiplePart(parameter: String): MultipartBody.Part {
return MultipartBody.Part.createFormData(parameter, name, RequestBody.create(MediaType.parse("multipart/form-data"), this))
}
BotScope.launch(Dispatchers.IO) {
kotlin.runCatching {
BotApi.get().sendPhoto(ConfigUtil.botToken, chatId.toRequestBody(), photoFile.toMultiplePart("photo"))
}.onFailure {
it.printStackTrace()
}
}
插件功能支持
因为 我朋友 已经做了一个QQ摸鱼机器人,所以我想可以做个插件支持,“为了这碟醋做顿饺子”,生命在于折腾嘛。
基本插件化原理
java本身有加载外部jar的 ClassLoader
—— URLClassLoader
,因此插件可以定义为单个外部jar包,插件与宿主之间借助SPI进行通信。
因而,我首先定义插件API:
/**
* 机器人功能接口,需要插件来实现
*/
interface BotContentProvider {
val pluginName: String
/**
* 插件加载事件
*/
fun onPluginLoaded() {}
fun onTextMessage(context: TextChatContext): Boolean = false
}
/**
* 机器人收到消息上下文对象,可以在此取到一些相关信息或进行一些操作
*/
interface ChatContext {
/**
* 发送方chat_id
*/ val chatId: String
/**
* 使用文本进行直接回复
* @param text 回复内容
*/
fun replyWithText(text: String)
}
/**
* 文本消息上下文
*/
interface TextChatContext: ChatContext {
/**
* 收到的文本内容
*/
val text: String
}
宿主通过implementation方式引入,插件通过compileOnly方式引入。当一条新消息到达时,宿主将调用插件的 onTextMessage()
方法,如果插件进行了处理则返回true,消息不再传递给下一个插件。
一个简单的EchoBot如下:
class EchoBot: BotContentProvider {
fun onTextMessage(context: TextChatContext) {
context.replyWithText(context.text)
}
}
插件版本兼容
然而这样的方案遇到一个问题,就是不同版本插件API的jar包不是ABI兼容的。如果API中新增了一个方法如 onPhotoMessage
,即使这个方法有默认实现,第三方插件也是要重新编译才能被我们的新版本机器人使用的。这是因为Kotlin的接口默认实现是编译期间的魔法,它将类未实现接口的默认方法指向一个叫 DefaultsImpls
的类。所以只有重新编译,第三方插件才能使用,因为他们没有实现我们新增的方法。
我目前的方法是,给接口包名中加入版本号,每次升级时复制一份。
然后加载插件时同时查找旧版本实现,通过适配器适配为新版本接口实现。
class BotContentProviderAdapter(private val adaptee: V1Adapter): V2Provider, LoggerOwner by slf4jLoggerOwner<BotContentProviderAdapter>() {
init {
logger.warn("{} 插件使用了旧版本API,现通过兼容方式装入,请联系开发者进行升级适配!", adaptee.pluginName)
}
override val pluginName: String = adaptee.pluginName
override fun onPluginLoaded() {
super.onPluginLoaded()
adaptee.onPluginLoaded()
}
override fun onTextMessage(context: ITextChatContext): Boolean {
return adaptee.onTextMessage(TextChatContextAdapter(context))
}
}
其实还是很麻烦,不知道有没有更简单的方法。
部署和运行
基本部署
使用 The Application Plugin 进行打包。tar包复制到服务器上,然后
tar -xvf ./starsea.tar
nohup starsea/starsea &
偶然崩溃自动重启
作为一个想要长期运行的机器人服务,偶尔崩掉一两次也是正常的,写一个脚本在程序崩溃时自动再次拉起来还是有必要的。当然我认为系统得至少稳定运行了一段时间,才能走崩溃自动拉起来的逻辑,否则万一发生了启动崩溃,拉起来就崩,崩了就拉起来,服务器:听我说谢谢你~
我的策略是,启动时脚本创建一个标志文件,系统稳定运行一段时间如5分钟后就把这个文件删掉。如果发生崩溃且未找到这个文件,脚本就尝试再次把机器人拉起来。
#!/bin/sh
#
# Copyright © 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
app_path=$0
# Need this for daisy-chained symlinks.
while
APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
[ -h "$app_path" ]
do
ls=$( ls -ld "$app_path" )
link=${ls#*' -> '}
case $link in #(
/*) app_path=$link ;; #(
*) app_path=$APP_HOME$link ;;
esacdone
APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
DEMAND_LOCK=demand_lock.lock
rm $DEMAND_LOCK
while true
do
if [ -f $DEMAND_LOCK ]
then
break
fi
touch $DEMAND_LOCK
echo "demand starting system..."
$APP_HOME/starsea # 启动一段时间后会删除这个文件 $DEMAND_LOCK echo "system exited."
done
echo "It seems that app exited in a short time. Please check and restart again."
然后在机器人启动时
// 运行5分钟后删除demand lock,这表示系统可以进入稳定运行阶段。如果刚启动就退出脚本不会重启程序
handler.postDelayed({
File("demand_lock.lock").takeIf { it.exists() }?.delete()
}, 5 * 60 * 1000)
主线程崩溃/长时间未响应检测
与安卓上不同,java默认一个线程的崩溃并不会导致整个程序的退出。于去我们的需求讲这并不合适。
按照我们现在的设计,主线程是相当重要的,崩溃或者严重卡顿是一定要退出的,否则都不会继续拉取和分发消息了,继续跑着也没有意义,不如直接崩掉,还能被我们上一步写的脚本重启。
现有比较流行的ANR检测方法可以给我提供不错的参考:
- 单独启动一个后台线程用于检测。
- 首先设置一个陷阱flag,称bomb,同时向主线程post一个job去移除这个flag。
- 后台线程休眠较长一段时间。
- 后台线程判断flag是否还在,如果主线程没能及时“拆弹”,就说明他已经挂了或者严重卡顿了,此时让系统挂掉就好了。
- 重复上面步骤。
最终这部分代码如下:
object ApplicationContext: Thread.UncaughtExceptionHandler, LoggerOwner by slf4jLoggerOwner<ApplicationContext>() {
private val handler by lazy { Handler(Looper.mainLooper()) }
var exiting = false
/**
* 将系统标记为稳定运行状态。demand_lock.lock文件是由启动脚本生成的。
*/
private fun markStable() {
File("demand_lock.lock").takeIf { it.exists() }?.delete()
}
fun init() {
// 主线程挂掉立即终止运行
Thread.setDefaultUncaughtExceptionHandler(this)
// 主线程长期无响应终止运行
var bomb: Boolean
thread {
Thread.currentThread().name = "main-watcher"
while (true) {
bomb = true
handler.post { bomb = false }
Thread.sleep(600_000)
if (bomb) {
logger.error("主线程长时间未响应!")
Thread.getAllStackTraces().forEach {
logger.error("dump thread {}, {}", it.key, it.value)
}
exitProcess(1)
}
}
}
// 运行5分钟后删除demand lock,这表示系统可以进入稳定运行阶段。如果刚启动就退出脚本不会重启程序
handler.postDelayed({
markStable()
}, 5 * 60 * 1000)
}
/**
* 将当前系统状态标记为稳定并停机。如果因为意外问题停机请exitProcess。
*/
fun shutdown(delay: Long = 3000L) {
markStable()
exiting = true
handler.postDelayed({
exitProcess(0)
}, delay)
}
override fun uncaughtException(thread: Thread, tr: Throwable) {
logger.error("exception in thread {}", thread, tr)
if (thread.name == "main") {
logger.error("Main thread crashed!")
exitProcess(1)
}
}
}
其他
Logger的封装
直接上代码。目的就是少写一点模板代码,没什么复杂的。
interface LoggerOwner {
val logger: Logger
}
class Slf4jLoggerOwner(private val tag: String) : LoggerOwner {
private val _logger by lazy {
LoggerFactory.getLogger(tag)
}
override val logger: Logger
get() = _logger
}
/**
* 创建logger对象。
* 用于方法委托使用。
*/
inline fun <reified T> slf4jLoggerOwner(): Slf4jLoggerOwner = Slf4jLoggerOwner(T::class.java.canonicalName?:"NO_NAME")
/**
* 创建logger对象
* 用于直接使用
*/
inline fun <reified T> slf4jLogger(): Logger = slf4jLoggerOwner<T>().logger
使用方法一(类文件中):
class Demo1: LoggerOwner by slf4jLoggerOwner<Demo1>() {
fun demo1() {
logger.info("demo 1")
}
}
使用方法2(单个方法中):
class Demo2
fun demo2() {
val logger = slf4jLogger<Demo2>()
logger.info("demo 2")
}
最后修改于 2022-08-30