背景
用过微信网页版的人应该都清楚网页登陆的流程,大致描述一下这个过程:
- 打开网页版登陆链接
- 页面会显示一个二维码
- 用微信客户端扫描二维码,让用户确认登陆网页版
- 如果确认登陆,网页版会自动进入聊天界面
这个过程的交互方式和一般的WEB应用不太一样,步骤4
网页自动跳转,明显是由服务端主动推送了内容给网页端,网页端收到跳转确认后才触发的,这里就引出了今天要讨论的问题:服务端推送技术
.服务端推送又称为Comet,服务端异步处理等.很早以前就出现了,但一直没有一个统一的标准,存在着不少Comet技术框架,各个Web容器也各自实现了自己的Comet支持.最近公司的产品也出现了和微信网页版登陆类似的场景,需要用到Comet技术,我简单的研究了下,写下来记录一下.
解决方案
Tomcat内置
omcat 内置支持,需要实现CometProcessor接口.但是应用就依赖Tomcat容器了, 由于依赖性过大, 暂不考虑.
Servlet3新特性
Servlet3提供一套完整的异步处理API,包括AsyncContext,AsyncLiseter,AsyncEvent. 要求Tomcat7.0++.
SpringMVC3.2
SpringMVC3.2 在Servlet3的基础上做了进一步的封装,编码更为简单,提供Callable,WebAsyncTask,DeferredResult三种方式进行异步编程支持,非常方便.
应用场景
扫描动态二维码关注微信公众账号.
流程
- 客户端调用服务端接口获取动态二维码以及二维码内容中内置的ID.(这个时候在客户端能看到一个二维码了,等待用户扫描)
- 客户端马上调用服务端的一个长连接接口,与服务端建立长连接,等待服务端通知.(这个过程是在后台发生的,用户无法感知)
- 用户拿出微信扫描二维码,就会有一个扫描事件通知到服务端的扫描接口.(这个时候服务端接收到扫描动作,完成自己的业务操作以后,通知长连接接口,用户已经扫描了,可以返回了).
关键点
- 步骤2里面要求客户端–服务端建立长连接,不会立即返回,客户端一直在等待状态.(Servlet3 的API可以支持,需要把Timeout时间设置长一点,一般是60S够了)
- 步骤3中 扫描接口要通知长连接接口,如何做到? 必须存在一个公共的容器,容器里面存着上下文信息,扫描接口把执行完毕的上下文告知长连接接口就可以了.
实现
配置web.xml的命名空间
1 2 3 4 5 6
| <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0"> </web-app>
|
配置tomcat
1 2
| <Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="20000" asyncTimeout="150000" URIEncoding="utf-8" redirectPort="8443" />
|
Servlet新特性
实现长连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| @WebServlet(value = "/scan/*",asyncSupported = true) public class ScanServlet extends HttpServlet { private Logger logger = Logger.getLogger(getClass()); @Override public void init() throws ServletException {
}
@Override public void destroy() { ScanRetain.MAP.clear(); }
@Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { doPost(req, resp); }
@Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { logger.debug(">>>>>>>>>>>>>>>>>开始访问长连接Servlet....."); String pathInfo = req.getPathInfo(); String key = null; if (pathInfo != null) { int i = pathInfo.lastIndexOf('/'); if (i >= 0) { key = pathInfo.substring(i + 1); } } if (key == null) { PrintWriter writer = resp.getWriter(); writer.write("error:not found scan key"); writer.flush(); return; } req.startAsync(req, resp); if (req.isAsyncStarted()) { final AsyncContext asyncContext = req.getAsyncContext(); final String theKey = key; asyncContext.setTimeout(60 * 1000L);
asyncContext.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent asyncEvent) throws IOException { ScanRetain.MAP.remove(theKey); }
@Override public void onTimeout(AsyncEvent asyncEvent) throws IOException { ScanRetain.MAP.remove(theKey); }
@Override public void onError(AsyncEvent asyncEvent) throws IOException { ScanRetain.MAP.remove(theKey); }
@Override public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
} });
logger.debug(">>>>>>>>>>>>>>>>>将长连接上下文对象加入队列等待处理........."); ScanRetain.MAP.put(theKey, asyncContext); } } }
|
公共Context容器存放类以及提供给扫描后对长连接响应处理的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class ScanRetain {
public static final ConcurrentHashMap<String, AsyncContext> MAP = new ConcurrentHashMap<String, AsyncContext>();
private Logger logger = Logger.getLogger(getClass());
public void doReturn(String key){ logger.debug(">>>>>>>>>>>>>>>>>长连接正在响应....."); AsyncContext asyncContext = MAP.get(key); if (asyncContext == null) { return; } HttpServletResponse res = (HttpServletResponse) asyncContext.getResponse(); DBObject data = new BasicDBObject("result",1) .append("info","ok") .append("now",System.currentTimeMillis()); String str = JSON.serialize(data); OutputStream os = null; try { os = res.getOutputStream(); os.write(str.getBytes("utf-8")); logger.debug(">>>>>>>>>>>>>>>>>长连接响应完毕....."); os.flush(); asyncContext.setTimeout(100L); } catch (IOException e) { e.printStackTrace(); } } }
|
扫描事件触发长连接响应的逻辑
1 2 3 4 5
| Long senceId = 0L; if (qrSenceId != null) { senceId = Long.parseLong(qrSenceId); } scanRetain.doReturn(senceId + "");
|
SpringMVC3.2
Spring的代码实现简单很多,把Servlet方案中的第二步和第三步合成了一步,但是也不那么直观,不利于理解.
实现长连接
1 2 3 4 5 6 7 8 9 10
| // 上下文容器 public static final ConcurrentHashMap<String, DeferredResult<String>> MAP = new ConcurrentHashMap<String, DeferredResult<String>>();
@RequestMapping("doScan/{key}") @ResponseBody public DeferredResult<String> doScan(@PathVariable("key") String key) { DeferredResult<String> result = new DeferredResult<String>(); MAP.put(key, result); return result; }
|
扫描事件触发长连接响应的逻辑
1 2 3 4 5 6 7 8 9 10 11
| @RequestMapping(value="/newScan/{key}",produces = "text/plain;charset=utf-8;") @ResponseBody public String newScan(@PathVariable("key") String key, HttpServletRequest req, HttpServletResponse res) { DeferredResult<String> data = Scans.MAP.get(key); if(data!=null){ data.setResult("this is result:"+System.currentTimeMillis()); Scans.MAP.remove(key); } return "new scan test finished :"+key+"now is :"+System.currentTimeMillis(); }
|
异步处理方式, 不适用于该场景
异步操作比较适用于一些比较耗时的操作(如大数据计算,文件处理),它们的响应一般不存在其他的触发点,就是取决于Callable内部代码块的执行结束.
Callable:
1 2 3 4 5 6 7 8 9 10 11
| @ResponseBody @RequestMapping("call") public Callable<String> call(HttpServletRequest req, HttpServletResponse res) throws Exception { return new Callable<String>() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(5); return "hello,callable"; } }; }
|
WebAsyncTask:
1 2 3 4 5 6 7 8 9 10 11 12
| @ResponseBody @RequestMapping("async") public WebAsyncTask<String> async(HttpServletRequest req, HttpServletResponse res) throws Exception { Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(5); return "hello,WebAsyncTask"; } }; return new WebAsyncTask<String>(1000*60L,callable); }
|
注意
如果在web.xml中配置了其它的servlet或filter, 必须在他们的根标签中增加支持异步操作标签<async-supported>true</async-supported>
.
总结
综上,我们大致可以总结出异步处理的两种应用场景:
多点操作,单点的响应往往依赖于其他点的触发,最典型的就是微信扫描登录了.这个基本的编码思路应该是这样的:
- 定义一个上下文存储容器,容器要支持并发,最好选用Concurrent类型.
- 开发长连接接口,客户端请求连接后,将上下文加入存储容器.
- 开发响应的触发逻辑代码段.
- 触发业务完成以后,调用响应触发逻辑.
单点操作,但是操作往往非常耗时,不能及时响应.这种场景一般会把耗时操作全部抽离到Callable代码段,响应的触发点就是Callable代码的结束处.