初始化提交

This commit is contained in:
USER-20221017CE\Administrator
2022-11-01 12:14:54 +08:00
commit d31fad2aa9
1733 changed files with 370203 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
package com.tuling.tulingmall;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class TulingmallCanalApplication {
public static void main(String[] args) {
SpringApplication.run(TulingmallCanalApplication.class, args);
}
}

View File

@@ -0,0 +1,41 @@
package com.tuling.tulingmall.config;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.net.InetSocketAddress;
//@Configuration
//@EnableScheduling
//@EnableAsync
public class CanalProductConfig {
// @Value("${canal.server.ip}")
// private String canalServerIp;
//
// @Value("${canal.server.port}")
// private int canalServerPort;
//
// @Value("${canal.server.username:blank}")
// private String userName;
//
// @Value("${canal.server.password:blank}")
// private String password;
//
// @Value("${canal.product.destination}")
// private String destination;
//
// @Bean("productConnector")
// public CanalConnector newSingleConnector(){
// String userNameStr = "blank".equals(userName) ? "" : userName;
// String passwordStr = "blank".equals(password) ? "" : password;
// return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,
// canalServerPort), destination, userNameStr, passwordStr);
// }
}

View File

@@ -0,0 +1,41 @@
package com.tuling.tulingmall.config;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.net.InetSocketAddress;
@Configuration
@EnableScheduling
@EnableAsync
public class CanalPromotionConfig {
@Value("${canal.server.ip}")
private String canalServerIp;
@Value("${canal.server.port}")
private int canalServerPort;
@Value("${canal.server.username:blank}")
private String userName;
@Value("${canal.server.password:blank}")
private String password;
@Value("${canal.promotion.destination}")
private String destination;
@Bean("promotionConnector")
public CanalConnector newSingleConnector(){
String userNameStr = "blank".equals(userName) ? "" : userName;
String passwordStr = "blank".equals(password) ? "" : password;
return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,
canalServerPort), destination, userNameStr, passwordStr);
}
}

View File

@@ -0,0 +1,41 @@
package com.tuling.tulingmall.config;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.net.InetSocketAddress;
@Configuration
@EnableScheduling
@EnableAsync
public class CanalSecKillConfig {
@Value("${canal.server.ip}")
private String canalServerIp;
@Value("${canal.server.port}")
private int canalServerPort;
@Value("${canal.server.username:blank}")
private String userName;
@Value("${canal.server.password:blank}")
private String password;
@Value("${canal.seckill.destination}")
private String destination;
@Bean("secKillConnector")
public CanalConnector newSingleConnector(){
String userNameStr = "blank".equals(userName) ? "" : userName;
String passwordStr = "blank".equals(password) ? "" : password;
return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,
canalServerPort), destination, userNameStr, passwordStr);
}
}

View File

@@ -0,0 +1,87 @@
package com.tuling.tulingmall.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
@Slf4j
public class PromotionRedisKey {
@Value ("${namespace.promotion:prmtd}")
private String promotionNamespace;
@Value ("${promotion.brand:br}")
private String brand;
@Value ("${promotion.newProduct:np}")
private String newProduct;
@Value ("${promotion.recProduct:rp}")
private String recProduct;
@Value ("${promotion.homeAdvertise:hd}")
private String homeAdvertise;
@Value ("${promotion.seckill:sk}")
private String secKill;
private String brandKey;
private String newProductKey;
private String recProductKey;
private String homeAdvertiseKey;
private String secKillKey;
@PostConstruct
public void initKey(){
brandKey = promotionNamespace + "." + brand;
newProductKey = promotionNamespace + "." + newProduct;
recProductKey = promotionNamespace + "." + recProduct;
homeAdvertiseKey = promotionNamespace + "." + homeAdvertise;
secKillKey = promotionNamespace + "." + secKill;
StringBuilder logKeyStr = new StringBuilder();
logKeyStr.append("[品牌推荐redis主键=").append(brandKey)
.append("] [新品推荐redis主键=").append(newProductKey)
.append("] [人气推荐redis主键=").append(recProductKey)
.append("] [轮播广告redis主键=").append(homeAdvertiseKey)
.append("] [秒杀redis主键=").append(secKillKey)
.append("]");
log.info("促销系统Redis主键配置{}",logKeyStr);
}
public String getBrandKey() {
return brandKey;
}
public String getNewProductKey() {
return newProductKey;
}
public String getRecProductKey() {
return recProductKey;
}
public String getHomeAdvertiseKey() {
return homeAdvertiseKey;
}
public String getSecKillKey() {
return secKillKey;
}
@Value("${promotion.demo.allowLocalCache:true}")
private boolean allowLocalCache;
@Value("${promotion.demo.allowRemoteCache:true}")
private boolean allowRemoteCache;
public boolean isAllowLocalCache() {
return allowLocalCache;
}
public boolean isAllowRemoteCache() {
return allowRemoteCache;
}
}

View File

@@ -0,0 +1,24 @@
package com.tuling.tulingmall.domain;
import lombok.Data;
import java.math.BigDecimal;
/*往ES存取时的数据实体类*/
@Data
public class ProductESVo {
private String name;
private String keywords;
private String subTitle;
private BigDecimal price;
private BigDecimal promotionPrice;
private BigDecimal originalPrice;
private String pic;
private Integer saleCount;
private Long brandId;
private String brandName;
private Long categoryId;
private String categoryName;
}

View File

@@ -0,0 +1,25 @@
package com.tuling.tulingmall.feignapi.promotion;
import com.tuling.tulingmall.common.api.CommonResult;
import com.tuling.tulingmall.promotion.domain.FlashPromotionProduct;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.List;
/**
* @desc:
*/
@FeignClient(name = "tulingmall-promotion",path = "/seckill")
public interface PromotionFeignApi {
/*获得秒杀内容*/
@RequestMapping(value = "/getHomeSecKillProductList", method = RequestMethod.GET)
@ResponseBody
CommonResult<List<FlashPromotionProduct>> getHomeSecKillProductList(
@RequestParam(value = "secKillId") long secKillId,
@RequestParam(value = "status") long status);
}

View File

@@ -0,0 +1,514 @@
package com.tuling.tulingmall.model;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;
public class PmsProduct implements Serializable {
private Long id;
private Long brandId;
private Long productCategoryId;
private Long feightTemplateId;
private Long productAttributeCategoryId;
private String name;
private String pic;
@ApiModelProperty(value = "货号")
private String productSn;
@ApiModelProperty(value = "删除状态0->未删除1->已删除")
private Integer deleteStatus;
@ApiModelProperty(value = "上架状态0->下架1->上架")
private Integer publishStatus;
@ApiModelProperty(value = "新品状态:0->不是新品1->新品")
private Integer newStatus;
@ApiModelProperty(value = "推荐状态0->不推荐1->推荐")
private Integer recommandStatus;
@ApiModelProperty(value = "审核状态0->未审核1->审核通过")
private Integer verifyStatus;
@ApiModelProperty(value = "排序")
private Integer sort;
@ApiModelProperty(value = "销量")
private Integer sale;
private BigDecimal price;
@ApiModelProperty(value = "促销价格")
private BigDecimal promotionPrice;
@ApiModelProperty(value = "赠送的成长值")
private Integer giftGrowth;
@ApiModelProperty(value = "赠送的积分")
private Integer giftPoint;
@ApiModelProperty(value = "限制使用的积分数")
private Integer usePointLimit;
@ApiModelProperty(value = "副标题")
private String subTitle;
@ApiModelProperty(value = "市场价")
private BigDecimal originalPrice;
@ApiModelProperty(value = "库存")
private Integer stock;
@ApiModelProperty(value = "库存预警值")
private Integer lowStock;
@ApiModelProperty(value = "单位")
private String unit;
@ApiModelProperty(value = "商品重量,默认为克")
private BigDecimal weight;
@ApiModelProperty(value = "是否为预告商品0->不是1->是")
private Integer previewStatus;
@ApiModelProperty(value = "以逗号分割的产品服务1->无忧退货2->快速退款3->免费包邮")
private String serviceIds;
private String keywords;
private String note;
@ApiModelProperty(value = "画册图片连产品图片限制为5张以逗号分割")
private String albumPics;
private String detailTitle;
@ApiModelProperty(value = "促销开始时间")
private Date promotionStartTime;
@ApiModelProperty(value = "促销结束时间")
private Date promotionEndTime;
@ApiModelProperty(value = "活动限购数量")
private Integer promotionPerLimit;
@ApiModelProperty(value = "促销类型0->没有促销使用原价;1->使用促销价2->使用会员价3->使用阶梯价格4->使用满减价格5->限时购")
private Integer promotionType;
@ApiModelProperty(value = "品牌名称")
private String brandName;
@ApiModelProperty(value = "商品分类名称")
private String productCategoryName;
@ApiModelProperty(value = "商品描述")
private String description;
private String detailDesc;
@ApiModelProperty(value = "产品详情网页内容")
private String detailHtml;
@ApiModelProperty(value = "移动端网页详情")
private String detailMobileHtml;
private static final long serialVersionUID = 1L;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getBrandId() {
return brandId;
}
public void setBrandId(Long brandId) {
this.brandId = brandId;
}
public Long getProductCategoryId() {
return productCategoryId;
}
public void setProductCategoryId(Long productCategoryId) {
this.productCategoryId = productCategoryId;
}
public Long getFeightTemplateId() {
return feightTemplateId;
}
public void setFeightTemplateId(Long feightTemplateId) {
this.feightTemplateId = feightTemplateId;
}
public Long getProductAttributeCategoryId() {
return productAttributeCategoryId;
}
public void setProductAttributeCategoryId(Long productAttributeCategoryId) {
this.productAttributeCategoryId = productAttributeCategoryId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPic() {
return pic;
}
public void setPic(String pic) {
this.pic = pic;
}
public String getProductSn() {
return productSn;
}
public void setProductSn(String productSn) {
this.productSn = productSn;
}
public Integer getDeleteStatus() {
return deleteStatus;
}
public void setDeleteStatus(Integer deleteStatus) {
this.deleteStatus = deleteStatus;
}
public Integer getPublishStatus() {
return publishStatus;
}
public void setPublishStatus(Integer publishStatus) {
this.publishStatus = publishStatus;
}
public Integer getNewStatus() {
return newStatus;
}
public void setNewStatus(Integer newStatus) {
this.newStatus = newStatus;
}
public Integer getRecommandStatus() {
return recommandStatus;
}
public void setRecommandStatus(Integer recommandStatus) {
this.recommandStatus = recommandStatus;
}
public Integer getVerifyStatus() {
return verifyStatus;
}
public void setVerifyStatus(Integer verifyStatus) {
this.verifyStatus = verifyStatus;
}
public Integer getSort() {
return sort;
}
public void setSort(Integer sort) {
this.sort = sort;
}
public Integer getSale() {
return sale;
}
public void setSale(Integer sale) {
this.sale = sale;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
public BigDecimal getPromotionPrice() {
return promotionPrice;
}
public void setPromotionPrice(BigDecimal promotionPrice) {
this.promotionPrice = promotionPrice;
}
public Integer getGiftGrowth() {
return giftGrowth;
}
public void setGiftGrowth(Integer giftGrowth) {
this.giftGrowth = giftGrowth;
}
public Integer getGiftPoint() {
return giftPoint;
}
public void setGiftPoint(Integer giftPoint) {
this.giftPoint = giftPoint;
}
public Integer getUsePointLimit() {
return usePointLimit;
}
public void setUsePointLimit(Integer usePointLimit) {
this.usePointLimit = usePointLimit;
}
public String getSubTitle() {
return subTitle;
}
public void setSubTitle(String subTitle) {
this.subTitle = subTitle;
}
public BigDecimal getOriginalPrice() {
return originalPrice;
}
public void setOriginalPrice(BigDecimal originalPrice) {
this.originalPrice = originalPrice;
}
public Integer getStock() {
return stock;
}
public void setStock(Integer stock) {
this.stock = stock;
}
public Integer getLowStock() {
return lowStock;
}
public void setLowStock(Integer lowStock) {
this.lowStock = lowStock;
}
public String getUnit() {
return unit;
}
public void setUnit(String unit) {
this.unit = unit;
}
public BigDecimal getWeight() {
return weight;
}
public void setWeight(BigDecimal weight) {
this.weight = weight;
}
public Integer getPreviewStatus() {
return previewStatus;
}
public void setPreviewStatus(Integer previewStatus) {
this.previewStatus = previewStatus;
}
public String getServiceIds() {
return serviceIds;
}
public void setServiceIds(String serviceIds) {
this.serviceIds = serviceIds;
}
public String getKeywords() {
return keywords;
}
public void setKeywords(String keywords) {
this.keywords = keywords;
}
public String getNote() {
return note;
}
public void setNote(String note) {
this.note = note;
}
public String getAlbumPics() {
return albumPics;
}
public void setAlbumPics(String albumPics) {
this.albumPics = albumPics;
}
public String getDetailTitle() {
return detailTitle;
}
public void setDetailTitle(String detailTitle) {
this.detailTitle = detailTitle;
}
public Date getPromotionStartTime() {
return promotionStartTime;
}
public void setPromotionStartTime(Date promotionStartTime) {
this.promotionStartTime = promotionStartTime;
}
public Date getPromotionEndTime() {
return promotionEndTime;
}
public void setPromotionEndTime(Date promotionEndTime) {
this.promotionEndTime = promotionEndTime;
}
public Integer getPromotionPerLimit() {
return promotionPerLimit;
}
public void setPromotionPerLimit(Integer promotionPerLimit) {
this.promotionPerLimit = promotionPerLimit;
}
public Integer getPromotionType() {
return promotionType;
}
public void setPromotionType(Integer promotionType) {
this.promotionType = promotionType;
}
public String getBrandName() {
return brandName;
}
public void setBrandName(String brandName) {
this.brandName = brandName;
}
public String getProductCategoryName() {
return productCategoryName;
}
public void setProductCategoryName(String productCategoryName) {
this.productCategoryName = productCategoryName;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getDetailDesc() {
return detailDesc;
}
public void setDetailDesc(String detailDesc) {
this.detailDesc = detailDesc;
}
public String getDetailHtml() {
return detailHtml;
}
public void setDetailHtml(String detailHtml) {
this.detailHtml = detailHtml;
}
public String getDetailMobileHtml() {
return detailMobileHtml;
}
public void setDetailMobileHtml(String detailMobileHtml) {
this.detailMobileHtml = detailMobileHtml;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName());
sb.append(" [");
sb.append("Hash = ").append(hashCode());
sb.append(", id=").append(id);
sb.append(", brandId=").append(brandId);
sb.append(", productCategoryId=").append(productCategoryId);
sb.append(", feightTemplateId=").append(feightTemplateId);
sb.append(", productAttributeCategoryId=").append(productAttributeCategoryId);
sb.append(", name=").append(name);
sb.append(", pic=").append(pic);
sb.append(", productSn=").append(productSn);
sb.append(", deleteStatus=").append(deleteStatus);
sb.append(", publishStatus=").append(publishStatus);
sb.append(", newStatus=").append(newStatus);
sb.append(", recommandStatus=").append(recommandStatus);
sb.append(", verifyStatus=").append(verifyStatus);
sb.append(", sort=").append(sort);
sb.append(", sale=").append(sale);
sb.append(", price=").append(price);
sb.append(", promotionPrice=").append(promotionPrice);
sb.append(", giftGrowth=").append(giftGrowth);
sb.append(", giftPoint=").append(giftPoint);
sb.append(", usePointLimit=").append(usePointLimit);
sb.append(", subTitle=").append(subTitle);
sb.append(", originalPrice=").append(originalPrice);
sb.append(", stock=").append(stock);
sb.append(", lowStock=").append(lowStock);
sb.append(", unit=").append(unit);
sb.append(", weight=").append(weight);
sb.append(", previewStatus=").append(previewStatus);
sb.append(", serviceIds=").append(serviceIds);
sb.append(", keywords=").append(keywords);
sb.append(", note=").append(note);
sb.append(", albumPics=").append(albumPics);
sb.append(", detailTitle=").append(detailTitle);
sb.append(", promotionStartTime=").append(promotionStartTime);
sb.append(", promotionEndTime=").append(promotionEndTime);
sb.append(", promotionPerLimit=").append(promotionPerLimit);
sb.append(", promotionType=").append(promotionType);
sb.append(", brandName=").append(brandName);
sb.append(", productCategoryName=").append(productCategoryName);
sb.append(", description=").append(description);
sb.append(", detailDesc=").append(detailDesc);
sb.append(", detailHtml=").append(detailHtml);
sb.append(", detailMobileHtml=").append(detailMobileHtml);
sb.append(", serialVersionUID=").append(serialVersionUID);
sb.append("]");
return sb.toString();
}
}

View File

@@ -0,0 +1,9 @@
package com.tuling.tulingmall.service;
public interface IProcessCanalData {
void connect();
void disConnect();
void processData();
}

View File

@@ -0,0 +1,248 @@
package com.tuling.tulingmall.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.tuling.tulingmall.config.CanalProductConfig;
import com.tuling.tulingmall.domain.ProductESVo;
import com.tuling.tulingmall.service.IProcessCanalData;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
/**
* 演示异构数据库下的实时数据同步,
* 商品数据的变化从MySQL同步到ElasticSearch本类未经测试仅供参考
* 同时商品数据的变化从业务上来说也应该同步一份到Redis本次课程略过具体实现
*/
//@Service
@Slf4j
public class ProductESData implements IProcessCanalData {
private final static String T_ID = "id";
private final static String T_NAME = "name";
private final static String T_KEYWORDS = "keywords";
private final static String T_SUB_TITLE = "sub_title";
private final static String T_PRICE = "price";
private final static String T_PROMOTION_PRICE = "promotion_price";
private final static String T_ORIGINAL_PRICE = "original_price";
private final static String T_PIC = "pic";
private final static String T_SALE = "sale";
private final static String T_BRAND_ID = "brand_id";
private final static String T_BRAND_NAME = "brand_name";
private final static String T_PRODUCT_CATEGORY_ID = "product_category_id";
private final static String T_PRODUCT_CATEGORY_NAME = "product_category_name";
@Value("${canal.product.indexName}")
private String indexName;
@Autowired
@Qualifier("productConnector")
private CanalConnector connector;
@Value("${canal.product.subscribe:server}")
private String subscribe;
@Value("${canal.product.batchSize}")
private int batchSize;
@Autowired
private RestHighLevelClient restHighLevelClient;
@PostConstruct
@Override
public void connect() {
connector.connect();
if("server".equals(subscribe))
connector.subscribe(null);
else
connector.subscribe(subscribe);
connector.rollback();
}
@PreDestroy
@Override
public void disConnect() {
connector.disconnect();
}
@Async
@Scheduled(initialDelayString = "${canal.product.initialDelay:5000}", fixedDelayString = "${canal.product.fixedDelay:1000}")
@Override
public void processData() {
try {
if (!connector.checkValid()) {
log.warn("与Canal服务器的连接失效重连下个周期再检查数据变更");
this.connect();
} else {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
log.info("本次没有检测到商品数据更新。");
} else {
log.info("商品数据本次共有[{}]次更新需要处理", size);
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();
log.debug("数据变更详情来自binglog[{}.{}],数据源{}.{},变更类型{}",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), tableName, eventType);
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : columns) {
if(column.getName().equals("id")) {
deleteDoc(column.getValue());
break;
}
}
} else if (eventType == CanalEntry.EventType.INSERT) {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
ProductESVo productESVo = new ProductESVo();
String docId = makeVo(columns,productESVo);
insertDoc(docId,productESVo);
} else {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
ProductESVo productESVo = new ProductESVo();
String docId = makeVo(columns,productESVo);
if(null != docId){
if(null == productESVo){
log.info("商品的删除状态字段update为已删除从ES中移除");
deleteDoc(docId);
}else updateDoc(docId,productESVo);
}
}
}
}
connector.ack(batchId); // 提交确认
}
}
} catch (Exception e) {
log.error("处理商品Canal同步数据失效请检查", e);
}
}
private String makeVo(List<CanalEntry.Column> columns,ProductESVo productESVo){
String docId = null;
for (CanalEntry.Column column : columns) {
String colName = column.getName();
String colValue = column.getValue();
if(colName.equals(T_ID)) {
docId = colValue;
}else if(colName.equals(T_NAME)) {
productESVo.setName(colValue);
} if(colName.equals(T_KEYWORDS)) {
productESVo.setKeywords(colValue);
} if(colName.equals(T_SUB_TITLE)) {
productESVo.setSubTitle(colValue);
} if(colName.equals(T_PRICE)) {
productESVo.setPrice(new BigDecimal(colValue));
} if(colName.equals(T_PROMOTION_PRICE)) {
productESVo.setPromotionPrice(new BigDecimal(colValue));
} if(colName.equals(T_ORIGINAL_PRICE)) {
productESVo.setOriginalPrice(new BigDecimal(colValue));
} if(colName.equals(T_PIC)) {
productESVo.setPic(colValue);
} if(colName.equals(T_SALE)) {
productESVo.setSaleCount(Integer.valueOf(colValue));
} if(colName.equals(T_BRAND_ID)) {
productESVo.setBrandId(Long.valueOf(colValue));
} if(colName.equals(T_BRAND_NAME)) {
productESVo.setBrandName(colValue);
} if(colName.equals(T_PRODUCT_CATEGORY_ID)) {
productESVo.setCategoryId(Long.valueOf(colValue));
} if(colName.equals(T_PRODUCT_CATEGORY_NAME)) {
productESVo.setCategoryName(colValue);
} if(colName.equals("delete_status")) {
if(1 == Integer.valueOf(colValue)){
productESVo = null;
}
}
}
return docId;
}
private void deleteDoc(String docId) throws IOException {
DeleteRequest deleteRequest = new DeleteRequest(indexName, docId);
DeleteResponse deleteResponse =
restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
String indexDoc = indexName+"/"+docId;
if(deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND){
log.warn("删除不存在的文档 {}", indexDoc);
}else{
log.info("删除文档 {} 成功",indexDoc);
}
}
private void updateDoc(String docId,ProductESVo productESVo) throws IOException {
String productJson = JSONObject.toJSONString(productESVo);
/*XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
xContentBuilder.startObject();
for (Map.Entry<String, String> entry : updateField.entrySet()) {
xContentBuilder.field(entry.getKey(), entry.getValue());
}
xContentBuilder.endObject();*/
UpdateRequest request =
new UpdateRequest(indexName, docId).doc(productJson,XContentType.JSON);
request.docAsUpsert(true);
UpdateResponse updateResponse =
restHighLevelClient.update(request, RequestOptions.DEFAULT);
String indexDoc = indexName+"/"+docId;
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
log.info("文档 {} 不存在,更新变更为创建成功",indexDoc);
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
log.info("文档 {} 不存在,更新成功",indexDoc);
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
log.warn("更新操作里文档 {} 被删除,请检查",indexDoc);
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
log.warn("文档 {} 未做任何更新操作,请检查",indexDoc);
}
}
private void insertDoc(String docId, ProductESVo productESVo) throws IOException {
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.id(docId);
String productJson = JSONObject.toJSONString(productESVo);
indexRequest.source(productJson, XContentType.JSON);
IndexResponse indexResponse =
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
if(indexResponse!=null){
String id = indexResponse.getId();
if(indexResponse.getResult() == DocWriteResponse.Result.CREATED){
log.info("新增文档成功,id = {}",id);
}else if(indexResponse.getResult() == DocWriteResponse.Result.UPDATED){
log.warn("新增转为覆盖文档成功,id = {}",id);
}
}
}
}

View File

@@ -0,0 +1,118 @@
package com.tuling.tulingmall.service.impl;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.tuling.tulingmall.config.PromotionRedisKey;
import com.tuling.tulingmall.rediscomm.util.RedisClusterUtil;
import com.tuling.tulingmall.service.IProcessCanalData;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@Service
@Slf4j
public class PromotionData implements IProcessCanalData {
private final static String SMS_HOME_ADVERTISE = "sms_home_advertise";
private final static String SMS_HOME_BRAND = "sms_home_brand";
private final static String SMS_HOME_NEW_PRODUCT = "sms_home_new_product";
private final static String SMS_HOME_RECOMMEND_PRODUCT = "sms_home_recommend_product";
/*存储从表名到Redis缓存的键*/
private Map<String,String> tableMapKey = new HashMap<>();
@Autowired
@Qualifier("promotionConnector")
private CanalConnector connector;
@Autowired
private PromotionRedisKey promotionRedisKey;
@Autowired
private RedisClusterUtil redisOpsExtUtil;
@Value("${canal.promotion.subscribe:server}")
private String subscribe;
@Value("${canal.promotion.batchSize}")
private int batchSize;
@PostConstruct
@Override
public void connect() {
tableMapKey.put(SMS_HOME_ADVERTISE,promotionRedisKey.getHomeAdvertiseKey());
tableMapKey.put(SMS_HOME_BRAND,promotionRedisKey.getBrandKey());
tableMapKey.put(SMS_HOME_NEW_PRODUCT,promotionRedisKey.getNewProductKey());
tableMapKey.put(SMS_HOME_RECOMMEND_PRODUCT,promotionRedisKey.getRecProductKey());
connector.connect();
if("server".equals(subscribe))
connector.subscribe(null);
else
connector.subscribe(subscribe);
connector.rollback();
}
@PreDestroy
@Override
public void disConnect() {
connector.disconnect();
}
@Async
@Scheduled(initialDelayString="${canal.promotion.initialDelay:5000}",fixedDelayString = "${canal.promotion.fixedDelay:5000}")
@Override
public void processData() {
try {
if(!connector.checkValid()){
log.warn("与Canal服务器的连接失效重连下个周期再检查数据变更");
this.connect();
}else{
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
log.info("本次[{}]没有检测到促销数据更新。",batchId);
}else{
log.info("本次[{}]促销数据本次共有[{}]次更新需要处理",batchId,size);
/*一个表在一次周期内可能会被修改多次而对Redis缓存的处理只需要处理一次即可*/
Set<String> factKeys = new HashSet<>();
for(CanalEntry.Entry entry : message.getEntries()){
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
if(log.isDebugEnabled()){
CanalEntry.EventType eventType = rowChange.getEventType();
log.debug("数据变更详情来自binglog[{}.{}],数据源{}.{},变更类型{}",
entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),tableName,eventType);
}
factKeys.add(tableMapKey.get(tableName));
}
for(String key : factKeys){
if(StringUtils.isNotEmpty(key)) redisOpsExtUtil.delete(key);
}
connector.ack(batchId); // 提交确认
log.info("本次[{}]处理促销Canal同步数据完成",batchId);
}
}
} catch (Exception e) {
log.error("处理促销Canal同步数据失效请检查",e);
}
}
}

View File

@@ -0,0 +1,207 @@
package com.tuling.tulingmall.service.impl;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.tuling.tulingmall.common.constant.RedisKeyPrefixConst;
import com.tuling.tulingmall.config.PromotionRedisKey;
import com.tuling.tulingmall.feignapi.promotion.PromotionFeignApi;
import com.tuling.tulingmall.promotion.domain.FlashPromotionProduct;
import com.tuling.tulingmall.rediscomm.util.RedisClusterUtil;
import com.tuling.tulingmall.rediscomm.util.RedisSingleUtil;
import com.tuling.tulingmall.service.IProcessCanalData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class SecKillData implements IProcessCanalData {
private final static String SECKILL_STATUS = "status";
private final static String SECKILL_ID = "id";
private final static int ON_SECKILL_STATUS = 1;
private final static int OFF_SECKILL_STATUS = 0;
@Autowired
@Qualifier("secKillConnector")
private CanalConnector connector;
@Autowired
private PromotionRedisKey promotionRedisKey;
@Autowired
private RedisClusterUtil homeRedisOpsExtUtil;
@Autowired
private RedisSingleUtil secKillStockUtil;
@Value("${canal.seckill.subscribe:server}")
private String subscribe;
@Autowired
private PromotionFeignApi promotionFeignApi;
@Value("${canal.promotion.batchSize}")
private int batchSize;
@PostConstruct
@Override
public void connect() {
connector.connect();
if("server".equals(subscribe))
connector.subscribe(null);
else
connector.subscribe(subscribe);
connector.rollback();
}
@PreDestroy
@Override
public void disConnect() {
connector.disconnect();
}
@Async
@Scheduled(initialDelayString="${canal.seckill.initialDelay:5000}",fixedDelayString = "${canal.seckill.fixedDelay:5000}")
@Override
public void processData() {
try {
if(!connector.checkValid()){
log.warn("与Canal服务器的连接失效重连下个周期再检查数据变更");
this.connect();
}else{
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
log.info("本次[{}]没有检测到秒杀数据更新。",batchId);
}else{
log.info("本次[{}]秒杀数据本次共有[{}]次更新需要处理",batchId,size);
for(CanalEntry.Entry entry : message.getEntries()){
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();
if(log.isDebugEnabled()){
log.debug("数据变更详情来自binglog[{}.{}],数据源{}.{},变更类型{}",
entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),tableName,eventType);
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
long secKillId = -1L;
int secKillStatus = -1;
if (eventType == CanalEntry.EventType.DELETE) {/*秒杀活动被删除*/
for (CanalEntry.Column column : columns) {
if(column.getName().equals(SECKILL_ID)) {
secKillId = Long.valueOf(column.getValue());
break;
}
}
secKillOffRedis(promotionFeignApi.getHomeSecKillProductList(secKillId,STATUS_OFF).getData());
} else if (eventType == CanalEntry.EventType.INSERT) { /*新增秒杀活动*/
for (CanalEntry.Column column : columns) {
if(column.getName().equals(SECKILL_STATUS)) {
secKillStatus = Integer.valueOf(column.getValue());
}
if(column.getName().equals(SECKILL_ID)) {
secKillId = Long.valueOf(column.getValue());
}
}
/*秒杀活动开启*/
if(ON_SECKILL_STATUS == secKillStatus){
secKillOnRedis(secKillId);
}
} else {/*秒杀活动变更*/
for (CanalEntry.Column column : columns) {
if(column.getName().equals(SECKILL_STATUS)) {
secKillStatus = Integer.valueOf(column.getValue());
}
if(column.getName().equals(SECKILL_ID)) {
secKillId = Long.valueOf(column.getValue());
}
}
/*秒杀活动开启*/
if(ON_SECKILL_STATUS == secKillStatus){
secKillOnRedis(secKillId);
}else{/*秒杀活动关闭*/
secKillOffRedis(promotionFeignApi.getHomeSecKillProductList(secKillId,STATUS_OFF).getData());
}
}
}
}
connector.ack(batchId); // 提交确认
log.info("本次[{}]处理秒杀Canal同步数据完成",batchId);
}
}
} catch (Exception e) {
log.error("处理秒杀Canal同步数据失效请检查",e);
}
}
private static final int STATUS_ON = 1;
private static final int STATUS_OFF = 0;
/* PO 本方法可以用pipeline优化*/
private void secKillOnRedis(long secKillId){
List<FlashPromotionProduct> result =
promotionFeignApi.getHomeSecKillProductList(secKillId,STATUS_ON).getData();
if(CollectionUtils.isEmpty(result)){
log.warn("开启了秒杀,但是没有找到秒杀对应产品,请检查!");
return;
}
final String homeSecKillKey = promotionRedisKey.getSecKillKey();
homeRedisOpsExtUtil.delete(homeSecKillKey);
long homeShowDuration = result.get(0).getFlashPromotionEndDate().getTime() - System.currentTimeMillis();
if(homeShowDuration > 0){
/*首页显示需要*/
homeRedisOpsExtUtil.putListAllRight(homeSecKillKey,result);
homeRedisOpsExtUtil.expire(homeSecKillKey,homeShowDuration, TimeUnit.MILLISECONDS);
}
/*秒杀服务需要*/
for(FlashPromotionProduct product : result){
String productKey = RedisKeyPrefixConst.SECKILL_PRODUCT_PREFIX + product.getFlashPromotionId()
+ ":" + product.getId();
String productCountKey = RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + product.getId();
secKillStockUtil.delete(productKey);
secKillStockUtil.delete(productCountKey);
if(homeShowDuration > 0){
secKillStockUtil.set(productKey,product,homeShowDuration, TimeUnit.MILLISECONDS);
secKillStockUtil.set(productCountKey,product.getFlashPromotionCount(),homeShowDuration, TimeUnit.MILLISECONDS);
}
}
}
/* PO 本方法可以用pipeline优化*/
private void secKillOffRedis(List<FlashPromotionProduct> products){
if(CollectionUtils.isEmpty(products)){
log.warn("关闭秒杀,但是没有找到秒杀对应产品,请检查!");
return;
}
final String secKillKey = promotionRedisKey.getSecKillKey();
homeRedisOpsExtUtil.delete(secKillKey);
/*秒杀服务需要*/
for(FlashPromotionProduct product : products){
secKillStockUtil.delete(RedisKeyPrefixConst.SECKILL_PRODUCT_PREFIX + product.getFlashPromotionId()
+ ":" + product.getId());
secKillStockUtil.delete(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + product.getId());
}
}
}

View File

@@ -0,0 +1,85 @@
package com.tuling.tulingmall.util;
import com.google.common.base.CaseFormat;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @description: 反射工具类,根据column调用相关方法
**/
@Slf4j
public class ClassUtil {
private static final String SET_METHOD_PREFIX = "set";
private static final Collection<Class<?>> GENERAL_CLASS_TYPE;
static {
GENERAL_CLASS_TYPE = Sets.<Class<?>>newHashSet(boolean.class, Boolean.class, int.class, Integer.class,
long.class, Long.class,double.class,Double.class,BigDecimal.class, Date.class,String.class);
}
public static String getSetterMethodName(final String columnName) {
if (columnName.contains("_")) {
return CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, SET_METHOD_PREFIX + "_" + columnName);
}
return SET_METHOD_PREFIX + String.valueOf(columnName.charAt(0)).toUpperCase() + columnName.substring(1, columnName.length());
}
public static void callSetterMethod(final Object object, final String methodName, final String setterValue) {
for (Class<?> each : GENERAL_CLASS_TYPE) {
try {
Method method = object.getClass().getMethod(methodName, each);
if (boolean.class == each || Boolean.class == each) {
method.invoke(object, Boolean.valueOf(setterValue));
} else if (int.class == each || Integer.class == each) {
method.invoke(object, Integer.parseInt(setterValue));
} else if (long.class == each || Long.class == each) {
method.invoke(object, Long.parseLong(setterValue));
} else if (double.class == each || Double.class == each) {
method.invoke(object, Double.parseDouble(setterValue));
} else if (BigDecimal.class == each) {
method.invoke(object, new BigDecimal(setterValue));
} else if (Date.class == each && StringUtils.isNotBlank(setterValue)) {
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
method.invoke(object, simpleDateFormat.parse(setterValue));
} catch (ParseException e) {
log.error("日期转换错误...!:{}",e.getMessage(),e.getCause());
//throw new RuntimeException("日期转换错误...!");
}
} else {
method.invoke(object, setterValue);
}
return;
} catch (final ReflectiveOperationException ignored) {
}
}
}
/**
* 获取clazz类对象以及父类对象的所有成员属性Field
* @param clazz
* @return
*/
public static Field[] getAllFields(Class<?> clazz){
List<Field> fieldList = new ArrayList<>();
while (clazz != null){
fieldList.addAll(Arrays.asList(clazz.getDeclaredFields()));
clazz = clazz.getSuperclass();
}
Field[] fields = new Field[fieldList.size()];
fieldList.toArray(fields);
return fields;
}
}

View File

@@ -0,0 +1,63 @@
server:
port: 8856
spring:
application:
name: tulingmall-canal
data:
elasticsearch:
rest:
uris: 218.76.8.107:9200
client:
reactive:
password: zhuangzhou
username: tlbaiqi
redis:
cluster:
nodes: 218.76.8.107:8001,218.76.8.107:8002,218.76.8.107:8003
lettuce:
pool:
max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 8 # 连接池中的最大空闲连接
min-idle: 0 # 连接池中的最小空闲连接
timeout: 3000ms # 连接超时时间(毫秒)
redisSingle:
host: 218.76.8.107
port: 8003
lettuce:
pool:
max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 8 # 连接池中的最大空闲连接
min-idle: 0 # 连接池中的最小空闲连接
timeout: 3000ms # 连接超时时间(毫秒)
canal:
server:
ip: 127.0.0.1
port: 9933
# product:
# destination: product
# indexName: product_db
# batchSize: 1000
promotion:
destination: promotion
batchSize: 1000
seckill:
destination: seckill
batchSize: 1000
feign:
client:
config:
default:
loggerLevel: full
readTimeout: 3000
connectTimeout: 3000
#rocketmq配置
#rocketmq:
# name-server: 192.168.65.164:9876,192.168.65.194:9876,192.168.65.161:9876,192.168.65.148:9876,192.168.65.224:9876,192.168.65.215:9876, #连接超时时间
# producer:
# send-message-timeout: 30000 #发送消息超时时间
# canal:
# topic: productDetailChange
# group: cache-sync

View File

@@ -0,0 +1,24 @@
spring:
application:
name: tulingmall-canal
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848 #配置中心的地址
file-extension: yml #配置文件结尾的配置
shared-configs[0]:
data-id: tulingmall-nacos.yml
group: DEFAULT_GROUP
refresh: true
shared-configs[1]:
data-id: tulingmall-redis.yml # redis服务集群配置
group: DEFAULT_GROUP
refresh: true
shared-configs[2]:
data-id: tulingmall-redis-key-dev.yml #在多个服务之间共享redis的key
group: DEFAULT_GROUP
refresh: true
discovery:
server-addr: 127.0.0.1:8848
profiles:
active: dev

View File

@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
This is the JRebel configuration file. It maps the running application to your IDE workspace, enabling JRebel reloading for this project.
Refer to https://manuals.jrebel.com/jrebel/standalone/config.html for more information.
-->
<application generated-by="intellij" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.zeroturnaround.com" xsi:schemaLocation="http://www.zeroturnaround.com http://update.zeroturnaround.com/jrebel/rebel-2_3.xsd">
<id>tulingmall-canal</id>
<classpath>
<dir name="D:/GitSource/tmallV5/tulingmall-canal/target/classes">
</dir>
</classpath>
</application>