我的kafka JAVA调试代码

发布时间:2021-11-30 14:26:26

仅供自己参考,别人可能看不懂。


kafka 是很好的供所有分析库从生产库多次提取数据的中转库,特别是kafka 0.9后出现的kafka connect,个人认为能作为实时的ETL工具。


另外,kafka和storm都是流,但kafka不处理数据,storm可在kafka的基础上处理数据。storm在原理上和hadoop的mapreduce差不多,都有map reduce的过程,只是hadoop处理完一次MR后,就会结束,但storm不会结束,除非手动kill。这篇介绍storm的文章不错:?http://os.51cto.com/art/201308/408739.htm


个人认为,对于每次都是处理结构化数据的工作,可以不用storm。


下面是kafka的java调试程序,含json处理。



/**
* Created by hadoop on 16-6-21.
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.fasterxml.jackson.databind.ObjectMapper;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
public class ceshi {

private final ConsumerConnector consumer;

private ceshi() {
Properties props = new Properties();
//zookeeper 配置
props.put("zookeeper.connect", "192.168.3.31:2181");

//group 代表一个消费组
props.put("group.id", "jd-group");

//zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
//序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");

ConsumerConfig config = new ConsumerConfig(props);

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}

void consume() {
Map topicCountMap = new HashMap();
topicCountMap.put("test1", new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

Map>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream stream = consumerMap.get("test1").get(0);
ConsumerIterator iterator = stream.iterator();
Map> maps;

while (iterator.hasNext()) {
//System.out.println(iterator.next().message());
try {
maps=new ObjectMapper().readValue(iterator.next().message(), Map.class);
System.out.println( (Object)(maps.get("xm")) );
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
// TODO Auto-generated method stub
//System.out.println("请输入一个正整");
new ceshi().consume();
}

}





相关文档

  • 成都高校教师职称论文
  • 《品德与社会》教学设计
  • 别去5厘米之外
  • 考察材料
  • 天猫魔投怎么投游戏
  • 从零开始搭建中小企业可用框架(0)
  • 2017年个体养老金缴纳标准
  • 高中美术教学心得体会
  • 2016中级统计师统计工作实务真题及答案
  • 2016科目三路考扣分标准
  • 世界上最大的鲸鱼
  • 别告诉他我还爱他歌词
  • 手机app怎么转到sd卡
  • 基层党支部建设包括哪几个方面的内容
  • 超寂寞的伤感说说
  • Linux 上操作 与BBBlack通过USB 共享网络
  • c++模板两个数的加法
  • 初三化学精选试题
  • 实现一个最简单的
  • 幼儿园小班母亲节活动《感恩母亲节》教案
  • 最大似然估计(MLE)的一些公式与定理(python实践)
  • 绝命毒师第五季 《绝命毒师》第五季:主创谈大结局
  • python和java面向对象区别,总结Java和Python面向对象的不同点
  • 擦肩而过的作文500字3篇
  • 无花果叶子治疗痔疮的方法,无花果叶子治疗痔疮用干的还是鲜的
  • 十堰特产小吃方便带的
  • 初中生物学习攻略
  • 做黑客必须熟悉这些CMD命令!!
  • 写动物的作文带评语
  • 园林设计中的水景设计
  • 猜你喜欢

  • 关于北京语言大学2020年10月10日考试安排的通知
  • 【工程表格模板】付款证明
  • 国际工程承包第一章概述
  • 企业年度培训计划书(修改)
  • 电脑部分软件不能上网如何解决
  • 2012年广东专插本高等数学考试大纲
  • 2019-2020年五年级科学下册 生物繁殖新技术1教案 青岛版
  • 2011年六盘水源禧 五金机电专业市场营销策略案
  • 简餐店的经营模式
  • 武汉泽迪建材有限责任公司企业信用报告-天眼查
  • 微信文章图文排版指南
  • 高中物理光的衍射 同步练*人教版第三册.doc
  • 2019年人事行政主管自我评价
  • 2018-2019-一年级六一儿童节小报图片大全-精选word文档 (7页)
  • 【推荐下载】高二化学上册9月份月考检测试题2014
  • 【精品】2016年湖北省黄冈市红安县典明中学八年级上学期期中数学试卷带解析答案
  • 私人订制婚礼方案
  • 新人教版高中化学必修二—生活中常见的有机物乙醇教学设计
  • 山东2020版高考历史(人教版)一轮复*课时规范练26中国*现代社会生活的变迁Word版含解析
  • 房地产九种售楼部模式各个阶段的账务和税务处理
  • 西师大版数学五年级下册《3.4 长方体、正方体的表面积》课时堂同步练*含答案
  • 【推荐】今天的旅游_作文-实用word文档 (1页)
  • 项目融资有哪些方式
  • 2019-2020学年九年级语文上册 综合性学*四 给教科书编者或课文作者写一封信导学案 语文版.doc
  • 幼儿园优秀教案手掌拓印仙人掌(大班美术)
  • 关于提升高职院校中层干部执行力的思考
  • 一个虐你千百遍的问题:“RPC好,还是RESTful好?”
  • 项目承包责任书
  • 三年级起步作文教学设计
  • 中英文中英文文献翻译-汽车发动机凸轮轴
  • 独山吉利厨具有限公司企业信用报告-天眼查
  • 实现正弦信号的采样与重构课程设计报告
  • 【精编范文】马年幼儿园毕业教师祝福语-word范文模板 (3页)
  • 送给老师的暖心生日祝福语
  • 模糊模式识别在导航系统性能综合评价中的应用研究
  • 【CN209741802U】一种水面漂浮垃圾清理船【专利】
  • 对基于Poser软件*台的体育运动技术仿真模型的研究
  • 简短对自己说的话励志
  • 第三章 班级日常活动的创意设计.
  • 食堂工人自我鉴定多篇新版
  • 深情感恩父母亲演讲稿
  • 购房公积金贷款的相关办理手续
  • 电脑版