- 1. 首先,这是一个nio的框架,仍然是采用reactor模式,知道这一点后,那么编程就没有什么难的。nio的编程,无外乎就是这些套路, 再进一步说,网络编程,也就是这些套路了。
- 2. 那么剩下编程的注意点,也就是编解码的处理以及最后的业务逻辑的处理。
2.1 编解码的注意点:因为在网络编程中,client和server之间,往往需要完整的接收到一条消息的后,才交给业务逻辑处理。具体可以参看http://jimmee./blog/617544,其中,我们常常是继承自CumulativeProtocolDecoder来实现自己的解码器,主要的docode的方法,其作用是将本次数据和上次接收到的数据(如果doDecode方法没有处理完的话),统一放到一个buffer中,之后扔给 doDecode方法处理,若处理完之后,还有剩下的数据,则继续缓存。
-
-
-
-
-
-
-
-
-
- public void decode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- if (!session.getTransportMetadata().hasFragmentation()) {
- while (in.hasRemaining()) {
- if (!doDecode(session, in, out)) {
- break;
- }
- }
-
- return;
- }
-
- boolean usingSessionBuffer = true;
- IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
-
-
- if (buf != null) {
- boolean appended = false;
-
- if (buf.isAutoExpand()) {
- try {
- buf.put(in);
- appended = true;
- } catch (IllegalStateException e) {
-
-
- } catch (IndexOutOfBoundsException e) {
-
- }
- }
-
- if (appended) {
- buf.flip();
- } else {
-
-
- buf.flip();
- IoBuffer newBuf = IoBuffer.allocate(
- buf.remaining() + in.remaining()).setAutoExpand(true);
- newBuf.order(buf.order());
- newBuf.put(buf);
- newBuf.put(in);
- newBuf.flip();
- buf = newBuf;
-
-
- session.setAttribute(BUFFER, buf);
- }
- } else {
- buf = in;
- usingSessionBuffer = false;
- }
-
-
-
- for (;;) {
- int oldPos = buf.position();
- boolean decoded = doDecode(session, buf, out);
- if (decoded) {
- if (buf.position() == oldPos) {
- throw new IllegalStateException(
- "doDecode() can't return true when buffer is not consumed.");
- }
-
- if (!buf.hasRemaining()) {
- break;
- }
- } else {
- break;
- }
- }
-
-
-
-
- if (buf.hasRemaining()) {
- if (usingSessionBuffer && buf.isAutoExpand()) {
- buf.compact();
- } else {
- storeRemainingInSession(buf, session);
- }
- } else {
- if (usingSessionBuffer) {
- removeSessionBuffer(session);
- }
- }
- }
可以看一个具体的实现:PrefixedStringDecoder的 doDecode方法,这个decode是消息长度+具体字节流的消息格式。
- protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
- if (in.prefixedDataAvailable(prefixLength, maxDataLength)) {
- String msg = in.getPrefixedString(prefixLength, charset.newDecoder());
- out.write(msg);
- return true;
- }
-
- return false;
- }
其中, prefixedDataAvailable方法是判断得到IoBuffer里的数据是否满足一条消息了,如果再进去看这个方法的实现化,读取是,使用ByteBuffer的绝对位置的读取方法(这种读取不影响position的值的);当已经有一条完整的消息时,则用getPrefixedString读取(使用的是ByteBuffer的相对位置的读取方法,这会影响position的值,从而实际的消费掉数据).
2.2 业务逻辑的处理注意点,一般都会使用一个线程池处理。这里就有一个问题,有时候同一个连接的消息处理,是希望按照顺序来进行的。这也很简单,不需要保证一个连接的所有的业务处理都限定在一个固定的线程中,但是需要保证当有消息需要处理时,这些消息的处理,都在同一个线程中完成。当然了,mina中也有了相应的实现OrderedThreadPoolExecutor。实现原理很简单: 一个连接(session)对应一个消息队列,同时此session也放到一个队列中,说明这个session有消息需要处理。具体来说,就是使用SessionTasksQueue来表示一个Session的要处理的消息的队列; 使用
-
- private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
waitingSessions表示有消息需要处理的session的队列。
-
-
-
- @Override
- public void execute(Runnable task) {
- if (shutdown) {
- rejectTask(task);
- }
-
-
- checkTaskType(task);
-
- IoEvent event = (IoEvent) task;
-
-
- IoSession session = event.getSession();
-
-
- SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
- Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
-
- boolean offerSession;
-
-
-
-
- boolean offerEvent = eventQueueHandler.accept(this, event);
-
- if (offerEvent) {
-
- synchronized (tasksQueue) {
-
- tasksQueue.offer(event);
-
-
-
- if (sessionTasksQueue.processingCompleted) {
- sessionTasksQueue.processingCompleted = false;
- offerSession = true;
- } else {
- offerSession = false;
- }
-
- if (LOGGER.isDebugEnabled()) {
- print(tasksQueue, event);
- }
- }
- } else {
- offerSession = false;
- }
-
- if (offerSession) {
-
-
-
- waitingSessions.offer(session);
- }
-
- addWorkerIfNecessary();
-
- if (offerEvent) {
- eventQueueHandler.offered(this, event);
- }
对应的,看一下:
- private void runTasks(SessionTasksQueue sessionTasksQueue) {
- for (;;) {
- Runnable task;
- Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
-
- synchronized (tasksQueue) {
- task = tasksQueue.poll();
-
- if (task == null) {
- sessionTasksQueue.processingCompleted = true;
- break;
- }
- }
-
- eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
-
- runTask(task);
- }
- }
小结:个人认为,只要对nio编程熟悉,对mina框架,只要明白了以上两点,就不再有什么大问题了。
|