循环依赖问题求助
global/database/mysql/mysql.go下定义了全局数据库连接对象global.DB:
var Db *gorm.DB
func ReturnsInstance() *gorm.DB {
var mysqlConfig = config.Config.SqlConfig
//sql日志记录
myLogger := logger.New(
//设置Logger
//NewMyWriter(),
//输出在控制台,方便debug
log.New(os.Stdout, "\r\n", log.LstdFlags),
logger.Config{
LogLevel: logger.Silent, //仅仅在控制台输出指定Debug的语句
IgnoreRecordNotFoundError: true, // 忽略ErrRecordNotFound(记录未找到)错误
Colorful: true, // 禁用彩色打印
},
)
b := retry.NewFibonacci(10 * time.Second)
ctx := context.Background()
if err := retry.Do(ctx, retry.WithMaxRetries(5, b), func(ctx context.Context) error {
// 创建链接
var err error
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true&loc=Local", mysqlConfig.User, mysqlConfig.Password, mysqlConfig.IP, mysqlConfig.Port, mysqlConfig.Database)
Db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: myLogger,
})
if err != nil {
return err
}
if Db.Error != nil {
return err
}
return nil
}); err != nil {
// handle error
log.Fatalf("重连5次后依旧数据库链接错误,请检查mysql服务是否正常- %v \n", err)
}
return Db
}
global/global.go下定义init函数,项目启动时执行init函数,初始化各个示例,包括数据库连接和消费者:
func init() {
Logger = log.ReturnsInstance()
RedisDb = RedisDbFun.ReturnsInstance()
Db = mysql.ReturnsInstance()
Config = config.ReturnsInstance()
//普通队列的生产者
NormalProducer = msgQueue.ReturnsNormalInstance()
//延迟队列的生产者
DelayProducer = msgQueue.ReturnsDelayInstance()
//启动延时队列的消费者和普通队列的消费者
msgQueue.StartDelayConsumer()
msgQueue.StartNormalConsumer()
}
var (
Logger *logrus.Logger
Config *config.Info
Db *gorm.DB
RedisDb *redis.Client
NormalProducer *kafka.Conn
DelayProducer *kafka.Conn
)
然后调用global.Db即可进行数据库操作。
同时在global/msgQueue/consumer.go中定义了消费者,监听定时发布视频的消息,并调用setVideoIsVisible将对应视频的is_visible字段置为1:
// StartNormalConsumer 启动普通队列(即时处理)的消费者
func StartNormalConsumer() {
normalReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{consts.KafkaServerAddr},
Topic: consts.KafkaTopic,
StartOffset: kafka.LastOffset,
GroupID: "normal-consumer",
})
go func() {
for {
message, err := normalReader.ReadMessage(context.Background())
if err != nil {
log.Printf("读取普通队列的消息错误:%v", err)
continue
}
messsageBody := string(message.Value)
//处理定时发布视频的消息,此类消息形如:publishVideo_1234
if strings.HasPrefix(messsageBody, "publishVideo") {
//从message中提取视频的id
idStr := strings.Split(messsageBody, "_")[1]
videoId, err := strconv.ParseUint(idStr, 10, 0)
if err != nil {
log.Printf("提取视频id出错:%v", err)
}
if err = setVideoIsVisible(uint(videoId)); err != nil {
log.Printf("定时发布视频%d出错:%v", videoId, err)
}
}
//提交偏移量
if err := normalReader.CommitMessages(context.Background(), message); err != nil {
log.Printf("普通队列提交偏移量失败:%v", err)
}
}
}()
}
func setVideoIsVisible(id uint) error {
return global.Db.Model(&video.VideosContribution{}).Where("id = ?", id).Update("is_visible", 1).Error
}
消费者中调用了全局数据库连接对象,会报循环依赖错误,不知道如何解决,求解。
var Db *gorm.DB
func ReturnsInstance() *gorm.DB {
var mysqlConfig = config.Config.SqlConfig
//sql日志记录
myLogger := logger.New(
//设置Logger
//NewMyWriter(),
//输出在控制台,方便debug
log.New(os.Stdout, "\r\n", log.LstdFlags),
logger.Config{
LogLevel: logger.Silent, //仅仅在控制台输出指定Debug的语句
IgnoreRecordNotFoundError: true, // 忽略ErrRecordNotFound(记录未找到)错误
Colorful: true, // 禁用彩色打印
},
)
b := retry.NewFibonacci(10 * time.Second)
ctx := context.Background()
if err := retry.Do(ctx, retry.WithMaxRetries(5, b), func(ctx context.Context) error {
// 创建链接
var err error
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true&loc=Local", mysqlConfig.User, mysqlConfig.Password, mysqlConfig.IP, mysqlConfig.Port, mysqlConfig.Database)
Db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: myLogger,
})
if err != nil {
return err
}
if Db.Error != nil {
return err
}
return nil
}); err != nil {
// handle error
log.Fatalf("重连5次后依旧数据库链接错误,请检查mysql服务是否正常- %v \n", err)
}
return Db
}
global/global.go下定义init函数,项目启动时执行init函数,初始化各个示例,包括数据库连接和消费者:
func init() {
Logger = log.ReturnsInstance()
RedisDb = RedisDbFun.ReturnsInstance()
Db = mysql.ReturnsInstance()
Config = config.ReturnsInstance()
//普通队列的生产者
NormalProducer = msgQueue.ReturnsNormalInstance()
//延迟队列的生产者
DelayProducer = msgQueue.ReturnsDelayInstance()
//启动延时队列的消费者和普通队列的消费者
msgQueue.StartDelayConsumer()
msgQueue.StartNormalConsumer()
}
var (
Logger *logrus.Logger
Config *config.Info
Db *gorm.DB
RedisDb *redis.Client
NormalProducer *kafka.Conn
DelayProducer *kafka.Conn
)
然后调用global.Db即可进行数据库操作。
同时在global/msgQueue/consumer.go中定义了消费者,监听定时发布视频的消息,并调用setVideoIsVisible将对应视频的is_visible字段置为1:
// StartNormalConsumer 启动普通队列(即时处理)的消费者
func StartNormalConsumer() {
normalReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{consts.KafkaServerAddr},
Topic: consts.KafkaTopic,
StartOffset: kafka.LastOffset,
GroupID: "normal-consumer",
})
go func() {
for {
message, err := normalReader.ReadMessage(context.Background())
if err != nil {
log.Printf("读取普通队列的消息错误:%v", err)
continue
}
messsageBody := string(message.Value)
//处理定时发布视频的消息,此类消息形如:publishVideo_1234
if strings.HasPrefix(messsageBody, "publishVideo") {
//从message中提取视频的id
idStr := strings.Split(messsageBody, "_")[1]
videoId, err := strconv.ParseUint(idStr, 10, 0)
if err != nil {
log.Printf("提取视频id出错:%v", err)
}
if err = setVideoIsVisible(uint(videoId)); err != nil {
log.Printf("定时发布视频%d出错:%v", videoId, err)
}
}
//提交偏移量
if err := normalReader.CommitMessages(context.Background(), message); err != nil {
log.Printf("普通队列提交偏移量失败:%v", err)
}
}
}()
}
func setVideoIsVisible(id uint) error {
return global.Db.Model(&video.VideosContribution{}).Where("id = ?", id).Update("is_visible", 1).Error
}
消费者中调用了全局数据库连接对象,会报循环依赖错误,不知道如何解决,求解。
全部评论
相关推荐
10-18 16:05
快手_java后端开发(实习员工) 点赞 评论 收藏
分享
点赞 评论 收藏
分享
09-22 14:54
河南大学 Java 点赞 评论 收藏
分享