当前版本封装了ktor,支持自定义消息转发、验证时长、自定义验证、自定义数据类型(真实数据、身份、凭证)、多个账户同时登陆处理,可拓展性强。
并整合成Spring-Boot-Starter。
在custom-ktor项目下找到:
TestServer.kt(服务端)
fun main(args: Array<String>) {
val serverSocketServer = ServerSocketServer<String, String, String>(DefaultServerSocketSession())
serverSocketServer.start()
}
TestClient.kt(客户端)
fun main(args: Array<String>) {
//客户端
val clientSocketServer = ClientSocketServer(
clientSocketSession = DefaultClientSocketSession(SimpleAuthPackPackageImpl("admin1", "ktor"))
)
//设置消息处理器
clientSocketServer.setReceiveHandler {
println(it)
}
//服务启动
clientSocketServer.start()
//开启新线程来模拟发送数据
thread {
val scanner = Scanner(System.`in`)
while (scanner.hasNext()) {
val message = StringBuilder(scanner.nextLine())
for (i in 1..5) {
message.append(message)
}
clientSocketServer.sendMessage(
DefaultProtocolImpl(FrameType.BROADCAST, SimpleBroadcastPackageImpl(message.toString()))
)
}
}
}
该starter支持注入指定类型的通信服务:字节数组、字符串
ktor:
enable: true #是否开启ktor-starter的自动配置 (默认开启)
mode: byte #服务模式 可选值:simple(字符串)、 byte (字节数组)
defaultPassWord: ktor #若没有配置认证处理器,默认的认证密码(默认密码:ktor)
controller: false #是否自定义转发,false全局广播消息,为true时请重写ServerSocketSession的onMessageCustom方法(默认false)
authTime: 1000 #等待验证时间,在指定时间没有发送权限验证包会自动放弃该连接(默认1000毫秒)
port: 9000 #服务绑定端口
配置
@SpringBootApplication
class SpringBootKtorApplication {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<SpringBootKtorApplication>(*args)
}
}
//mode: simple #字符串类型
@Autowired
private lateinit var serverSocketServer: ServerSocketServer<String, String, String>
//mode: byte #字节数组类型
//@Autowired
//private lateinit var serverSocketServer: ServerSocketServer<ByteArray, String, String>
@PostConstruct
fun success() {
serverSocketServer.start()
}
}
项目基于ktor的netty再封装,开发者只需要实现认证和转发就可以实现一个简单的通信模式。
重点说明一下:ktor封装的netty的方法是异步非阻塞式的,其原理是ktor使用了kotlin的协程(本质还是线程的上下文切换),使得单线程也能实现netty的selector模式,有效的利用了系统资源,且降低了并发编程难度。
更多使用文档请加入交流群:
目前实现了socket通信,下版本支持websocket。
Ktor 是一个使用强大的 Kotlin 语言在互联系统中构建异步服务器与客户端的框架。利用Ktor可以实现web服务器以及Socket周边的通信实现。
参考官网:https://ktor.kotlincn.net/
前置知识:IDEA Kotlin的函数式 Koltin协程 Socket通信
你可以使用gradle或者maven构建一个ktor应用,当然你可以在dockerhub上去找关于ktor的镜像,关于初学者建议使用前两个构建工具来搭建一个简单的ktor项目入门。
官网的web服务有例子,这便着重测试Socket编程(官网socket文档很垃圾)
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-server-netty</artifactId>
<version>${ktor_version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback_version}</version>
</dependency>
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-server-core</artifactId>
<version>${ktor_version}</version>
</dependency>
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-server-sessions</artifactId>
<version>${ktor_version}</version>
</dependency>
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-websockets</artifactId>
<version>${ktor_version}</version>
</dependency>
这里我实现了一个基于netty的网络服务器,用来转发客户端的请求来实现网络通信,本来打算使用websocket实现(其实过),但由于考虑到websocket是socket的封装,性能毫无疑问的比socket低,对于游戏,fps,及时类的应用程序socket编程才是最佳之选。
通过Socket Server来维护各个Socket Client的连接对象,对数据交互IO进行处理。
传递对象避免出现粘包
//jvm对象序列化为字节数组
fun objectToByteArray(obj: Any): ByteArray {
val byteArrayOutputStream = ByteArrayOutputStream()
val objectOutputStream = ObjectOutputStream(byteArrayOutputStream)
objectOutputStream.writeObject(obj)
objectOutputStream.flush()
return byteArrayOutputStream.toByteArray()
}
//将字节数组反序列化为Protocol对象
@SuppressWarnings("unused")
fun <T : Serializable, T1 : Serializable, T2 : Serializable> byteArrayToObject(byteArray: ByteArray): Protocol<T, T1, T2>? {
val `in` = ByteArrayInputStream(byteArray)
val sIn = ObjectInputStream(`in`)
val protocol: Protocol<T, T1, T2>?
protocol = try {
sIn.readObject() as Protocol<T, T1, T2>?
} catch (e: Exception) {
e.printStackTrace()
null
}
return protocol
}
各个socket通信我们使用自定义协议来实现,避免粘包
data class Translation(var data: ByteArray? = null, var messageCurrentTime: Long? = null) : Serializable
fun main(args: Array<String>) {
//我们使用的Scanner在hasNext方法等待输入的时候是阻塞式的
//且输出操作的协程和读取操作的协程是在同一个协程上下文的,这就导致了该线程一直被Scanner的next方法阻塞
//协程调度器无法从阻塞的线程中再调度,也就是说输出操作的协程一直会阻塞读取操作的协程
//协程默认的上下文为当前线程(在这里是main线程),所以launch协程的默认调度上下文不符合我们的期望。
// launch是CoroutineScope的拓展函数,第一个参数指定是一个携程的上下文,不传默认就是当前线程
// public fun CoroutineScope.launch(
// context: CoroutineContext = EmptyCoroutineContext,
// start: CoroutineStart = CoroutineStart.DEFAULT,
// block: suspend CoroutineScope.() -> Unit
// )
// 这里我们new了两个单线程的协程上下文
// threadPoolOfInput为读取操作的协程的上下文
// threadPoolOfOutPut为输出操作的协程的上下文
// 他们其实是两个线程,也就是读取操作和写出操作是运行在两个独立的线程
// 所以他们不会互相阻塞对方
val threadPoolOfInput = newSingleThreadContext("input")
val threadPoolOfOutPut = newSingleThreadContext("output")
//启动一个阻塞式协程上下文构建器
runBlocking {
//在指定主机和端口来连接一个ServerSockert服务,等待服务端响应连接(异步非阻塞式)
val socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress("127.0.0.1", 2323))
//打开socker套接字的输入流(后面的autoFlush参数为刷新缓冲区,不用再手动write完数据后手动调flush)
val output = socket.openWriteChannel(autoFlush = true)
//打开socker套接字的输入流
val input = socket.openReadChannel()
//读取操作的协程
launch(threadPoolOfInput) {
while (true) {
input.awaitContent()
if (input.availableForRead == 0) break
val availableLength = input.readInt()
val byteArray = ByteArray(availableLength)
input.readFully(byteArray)
val data = SerializableTool.ByteArrayToObject(byteArray) as Translation
println(data.data?.let { kotlin.text.String(it) } + ",当前消息延迟为:" +
"${System.currentTimeMillis() - (if (data.messageCurrentTime == null) 0 else data.messageCurrentTime)!!}")
}
}
//输出操作的协程
launch(threadPoolOfOutPut) {
val scanner = Scanner(System.`in`)
while (scanner.hasNext()) {
val objectToByteArray = SerializableTool.ObjectToByteArray(
Translation(
("来自客户端${socket.remoteAddress}的消息:" + scanner.nextLine()).toByteArray(Charsets.UTF_8),
System.currentTimeMillis()
)
)
output.writeInt(objectToByteArray.size)
output.writeFully(objectToByteArray)
}
}
}
}
fun main(args: Array<String>) {
//我们使用的Scanner在hasNext方法等待输入的时候是阻塞式的
//且输出操作的协程和读取操作的协程是在同一个协程上下文的,这就导致了该线程一直被Scanner的next方法阻塞
//协程调度器无法从阻塞的线程中再调度,也就是说输出操作的协程一直会阻塞读取操作的协程
//协程默认的上下文为当前线程(在这里是main线程),所以launch协程的默认调度上下文不符合我们的期望。
// launch是CoroutineScope的拓展函数,第一个参数指定是一个携程的上下文,不传默认就是当前线程
// public fun CoroutineScope.launch(
// context: CoroutineContext = EmptyCoroutineContext,
// start: CoroutineStart = CoroutineStart.DEFAULT,
// block: suspend CoroutineScope.() -> Unit
// )
// 这里我们new了两个单线程的协程上下文
// threadPoolOfInput为读取操作的协程的上下文
// threadPoolOfOutPut为输出操作的协程的上下文
// 他们其实是两个线程,也就是读取操作和写出操作是运行在两个独立的线程
// 所以他们不会互相阻塞对方
val threadPoolOfInput = newSingleThreadContext("input")
val threadPoolOfOutPut = newSingleThreadContext("output")
//启动一个阻塞式协程上下文构建器
runBlocking {
//在指定主机和端口来连接一个ServerSockert服务,等待服务端响应连接(异步非阻塞式)
val socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress("127.0.0.1", 2323))
//打开socker套接字的输入流(后面的autoFlush参数为刷新缓冲区,不用再手动write完数据后手动调flush)
val output = socket.openWriteChannel(autoFlush = true)
//打开socker套接字的输入流
val input = socket.openReadChannel()
//如果是launch {
// ...
// }
//读取操作协程会被输出操作的协程中scanner的hasNext()一直阻塞,收不到服务端发来的消息
//读取操作的协程
launch(threadPoolOfInput) {
while (true) {
input.awaitContent()
if (input.availableForRead == 0) break
val availableLength = input.readInt()
val byteArray = ByteArray(availableLength)
input.readFully(byteArray)
val data = SerializableTool.ByteArrayToObject(byteArray) as Translation
println(data.data?.let { kotlin.text.String(it) } + ",当前消息延迟为:" +
"${System.currentTimeMillis() - (if (data.messageCurrentTime == null) 0 else data.messageCurrentTime)!!}")
}
}
//如果是launch {
// ...
// }
//输出操作的协程中scanner的hasNext()会一直阻塞读取操作协程,收不到服务端发来的消息
//输出操作的协程
launch(threadPoolOfOutPut) {
val scanner = Scanner(System.`in`)
while (scanner.hasNext()) {
val objectToByteArray = SerializableTool.ObjectToByteArray(
Translation(
("来自客户端${socket.remoteAddress}的消息:" + scanner.nextLine()).toByteArray(Charsets.UTF_8),
System.currentTimeMillis()
)
)
output.writeInt(objectToByteArray.size)
output.writeFully(objectToByteArray)
}
}
}
}
启动一个服务端,和三个客户端
客户端一发送"hello ktor!"
服务端接收到消息
客户端二接收到消息
客户端三接收到消息
大家忽略上面的延迟字样,第一次发消息延迟会比较高,因为会涉及第一次初始化,缓冲,建立流传输通道等操作,会后IO操作都是5ms左右。
ktor非常实用,小巧,对netty和jetty都进行了封装,在web方面也提供了websocket、Auth、JWT、Jackson、SSL实现,包括web周边的路由,CORS,模板渲染引擎(Freemarker,Themyleaf,Velocity)实现,很适合作为项目的脚手架。
推荐链接:
中国唯一 Google 官方认证 Android 和 Kotlin 双领域开发专家(GDE): https://space.bilibili.com/27559447?from=search&seid=18022887471961950104
ktor官网:https://ktor.kotlincn.net/
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。