mirror of https://github.com/synctv-org/synctv
Refecrot: vendor backend
parent
b3e498d365
commit
c796738e6f
@ -1,48 +0,0 @@
|
|||||||
package conf
|
|
||||||
|
|
||||||
type VendorConfig struct {
|
|
||||||
Bilibili map[string]BilibiliConfig `yaml:"bilibili" hc:"default use local vendor"`
|
|
||||||
Alist map[string]AlistConfig `yaml:"alist" hc:"default use local vendor"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultVendorConfig() VendorConfig {
|
|
||||||
return VendorConfig{
|
|
||||||
Bilibili: nil,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type Consul struct {
|
|
||||||
Endpoint string `yaml:"endpoint"`
|
|
||||||
Token string `yaml:"token,omitempty"`
|
|
||||||
TokenFile string `yaml:"token_file,omitempty"`
|
|
||||||
PathPrefix string `yaml:"path_prefix,omitempty"`
|
|
||||||
Namespace string `yaml:"namespace,omitempty"`
|
|
||||||
Partition string `yaml:"partition,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type Etcd struct {
|
|
||||||
Endpoints []string `yaml:"endpoints"`
|
|
||||||
Username string `yaml:"username,omitempty"`
|
|
||||||
Password string `yaml:"password,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type VendorBase struct {
|
|
||||||
ServerName string `yaml:"server_name" hc:"if use tls and grpc, servername must set the cert server name" env:"BILIBILI_SERVER_NAME"`
|
|
||||||
Endpoint string `yaml:"endpoint" env:"BILIBILI_ENDPOINT"`
|
|
||||||
JwtSecret string `yaml:"jwt_secret" env:"BILIBILI_JWT_SECRET"`
|
|
||||||
Scheme string `yaml:"scheme" lc:"grpc | http" env:"BILIBILI_SCHEME"`
|
|
||||||
Tls bool `yaml:"tls" env:"BILIBILI_TLS"`
|
|
||||||
CustomCAFile string `yaml:"custom_ca_file,omitempty" env:"BILIBILI_CUSTOM_CA_FILE"`
|
|
||||||
TimeOut string `yaml:"time_out" env:"BILIBILI_TIME_OUT"`
|
|
||||||
|
|
||||||
Consul Consul `yaml:"consul,omitempty" hc:"if use consul, must set the endpoint"`
|
|
||||||
Etcd Etcd `yaml:"etcd,omitempty" hc:"if use etcd, must set the endpoints"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type BilibiliConfig struct {
|
|
||||||
VendorBase `yaml:",inline"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type AlistConfig struct {
|
|
||||||
VendorBase `yaml:",inline"`
|
|
||||||
}
|
|
@ -0,0 +1,46 @@
|
|||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/synctv-org/synctv/internal/model"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetAllVendorBackend() ([]*model.VendorBackend, error) {
|
||||||
|
var backends []*model.VendorBackend
|
||||||
|
err := db.Find(&backends).Error
|
||||||
|
return backends, HandleNotFound(err, "backends")
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateVendorBackend(backend *model.VendorBackend) error {
|
||||||
|
return db.Create(backend).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
func DeleteVendorBackend(endpoint string) error {
|
||||||
|
return db.Where("backend_endpoint = ?", endpoint).Delete(&model.VendorBackend{}).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
func DeleteVendorBackends(endpoints []string) error {
|
||||||
|
return db.Where("backend_endpoint IN ?", endpoints).Delete(&model.VendorBackend{}).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetVendorBackend(endpoint string) (*model.VendorBackend, error) {
|
||||||
|
var backend model.VendorBackend
|
||||||
|
err := db.Where("backend_endpoint = ?", endpoint).First(&backend).Error
|
||||||
|
return &backend, HandleNotFound(err, "backend")
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateOrSaveVendorBackend(backend *model.VendorBackend) (*model.VendorBackend, error) {
|
||||||
|
return backend, Transactional(func(tx *gorm.DB) error {
|
||||||
|
if err := tx.Where("backend_endpoint = ?", backend.Backend.Endpoint).First(&model.VendorBackend{}).Error; errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
return tx.Create(&backend).Error
|
||||||
|
} else {
|
||||||
|
return tx.Save(&backend).Error
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func SaveVendorBackend(backend *model.VendorBackend) error {
|
||||||
|
return db.Save(backend).Error
|
||||||
|
}
|
@ -0,0 +1,98 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/synctv-org/synctv/utils"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Consul struct {
|
||||||
|
ServerName string
|
||||||
|
Token string
|
||||||
|
TokenFile string
|
||||||
|
PathPrefix string
|
||||||
|
Namespace string
|
||||||
|
Partition string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Etcd struct {
|
||||||
|
ServerName string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Backend struct {
|
||||||
|
Endpoint string `gorm:"primaryKey" json:"endpoint"`
|
||||||
|
Comment string `gorm:"type:text" json:"comment"`
|
||||||
|
Tls bool `gorm:"default:false" json:"tls"`
|
||||||
|
JwtSecret string `json:"jwtSecret"`
|
||||||
|
CustomCAFile string `json:"customCaFile"`
|
||||||
|
TimeOut string `gorm:"default:10s" json:"timeOut"`
|
||||||
|
|
||||||
|
Consul Consul `gorm:"embedded;embeddedPrefix:consul_" json:"consul"`
|
||||||
|
Etcd Etcd `gorm:"embedded;embeddedPrefix:etcd_" json:"etcd"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type VendorBackend struct {
|
||||||
|
Backend Backend `gorm:"embedded;embeddedPrefix:backend_" json:"backend"`
|
||||||
|
UsedBy BackendUsedBy `gorm:"embedded;embeddedPrefix:used_by_" json:"usedBy"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type BackendUsedBy struct {
|
||||||
|
Bilibili bool `gorm:"default:false" json:"bilibili"`
|
||||||
|
BilibiliBackendName string `json:"bilibiliBackendName"`
|
||||||
|
Alist bool `gorm:"default:false" json:"alist"`
|
||||||
|
AlistBackendName string `json:"alistBackendName"`
|
||||||
|
Emby bool `gorm:"default:false" json:"emby"`
|
||||||
|
EmbyBackendName string `json:"embyBackendName"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *VendorBackend) BeforeSave(tx *gorm.DB) error {
|
||||||
|
key := []byte(v.Backend.Endpoint)
|
||||||
|
var err error
|
||||||
|
if v.Backend.JwtSecret != "" {
|
||||||
|
if v.Backend.JwtSecret, err = utils.CryptoToBase64([]byte(v.Backend.JwtSecret), key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v.Backend.Consul.Token != "" {
|
||||||
|
if v.Backend.Consul.Token, err = utils.CryptoToBase64([]byte(v.Backend.Consul.Token), key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v.Backend.Etcd.Password != "" {
|
||||||
|
if v.Backend.Etcd.Password, err = utils.CryptoToBase64([]byte(v.Backend.Etcd.Password), key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *VendorBackend) AfterFind(tx *gorm.DB) error {
|
||||||
|
key := []byte(v.Backend.Endpoint)
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
data []byte
|
||||||
|
)
|
||||||
|
if v.Backend.JwtSecret != "" {
|
||||||
|
if data, err = utils.DecryptoFromBase64(v.Backend.JwtSecret, key); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
v.Backend.JwtSecret = string(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v.Backend.Consul.Token != "" {
|
||||||
|
if data, err = utils.DecryptoFromBase64(v.Backend.Consul.Token, key); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
v.Backend.Consul.Token = string(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v.Backend.Etcd.Password != "" {
|
||||||
|
if data, err = utils.DecryptoFromBase64(v.Backend.Etcd.Password, key); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
v.Backend.Etcd.Password = string(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -0,0 +1,76 @@
|
|||||||
|
package vendor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/synctv-org/vendors/api/emby"
|
||||||
|
embyService "github.com/synctv-org/vendors/service/emby"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EmbyInterface = emby.EmbyHTTPServer
|
||||||
|
|
||||||
|
func LoadEmbyClient(name string) EmbyInterface {
|
||||||
|
if cli, ok := backends.Load().emby[name]; ok && cli != nil {
|
||||||
|
return cli
|
||||||
|
}
|
||||||
|
return embyLocalClient
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
embyLocalClient EmbyInterface
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
embyLocalClient = embyService.NewEmbyService(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func EmbyLocalClient() EmbyInterface {
|
||||||
|
return embyLocalClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEmbyGrpcClient(conn *grpc.ClientConn) (EmbyInterface, error) {
|
||||||
|
if conn == nil {
|
||||||
|
return nil, errors.New("grpc client conn is nil")
|
||||||
|
}
|
||||||
|
conn.GetState()
|
||||||
|
return newGrpcEmby(emby.NewEmbyClient(conn)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ EmbyInterface = (*grpcEmby)(nil)
|
||||||
|
|
||||||
|
type grpcEmby struct {
|
||||||
|
client emby.EmbyClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGrpcEmby(client emby.EmbyClient) EmbyInterface {
|
||||||
|
return &grpcEmby{
|
||||||
|
client: client,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *grpcEmby) FsList(ctx context.Context, req *emby.FsListReq) (*emby.FsListResp, error) {
|
||||||
|
return e.client.FsList(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *grpcEmby) GetItem(ctx context.Context, req *emby.GetItemReq) (*emby.Item, error) {
|
||||||
|
return e.client.GetItem(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *grpcEmby) GetItems(ctx context.Context, req *emby.GetItemsReq) (*emby.GetItemsResp, error) {
|
||||||
|
return e.client.GetItems(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *grpcEmby) GetSystemInfo(ctx context.Context, req *emby.Empty) (*emby.SystemInfoResp, error) {
|
||||||
|
return e.client.GetSystemInfo(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *grpcEmby) Login(ctx context.Context, req *emby.LoginReq) (*emby.LoginResp, error) {
|
||||||
|
return e.client.Login(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *grpcEmby) Me(ctx context.Context, req *emby.MeReq) (*emby.MeResp, error) {
|
||||||
|
return e.client.Me(ctx, req)
|
||||||
|
}
|
@ -1,21 +1,329 @@
|
|||||||
package vendor
|
package vendor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-kratos/aegis/circuitbreaker"
|
||||||
|
"github.com/go-kratos/aegis/circuitbreaker/sre"
|
||||||
|
consul "github.com/go-kratos/kratos/contrib/registry/consul/v2"
|
||||||
|
"github.com/go-kratos/kratos/contrib/registry/etcd/v2"
|
||||||
klog "github.com/go-kratos/kratos/v2/log"
|
klog "github.com/go-kratos/kratos/v2/log"
|
||||||
|
"github.com/go-kratos/kratos/v2/middleware"
|
||||||
|
"github.com/go-kratos/kratos/v2/middleware/auth/jwt"
|
||||||
|
kcircuitbreaker "github.com/go-kratos/kratos/v2/middleware/circuitbreaker"
|
||||||
"github.com/go-kratos/kratos/v2/selector"
|
"github.com/go-kratos/kratos/v2/selector"
|
||||||
"github.com/go-kratos/kratos/v2/selector/wrr"
|
"github.com/go-kratos/kratos/v2/selector/wrr"
|
||||||
|
ggrpc "github.com/go-kratos/kratos/v2/transport/grpc"
|
||||||
|
"github.com/go-kratos/kratos/v2/transport/http"
|
||||||
|
jwtv4 "github.com/golang-jwt/jwt/v4"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/synctv-org/synctv/internal/conf"
|
"github.com/synctv-org/synctv/internal/model"
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Init(conf *conf.VendorConfig) error {
|
func init() {
|
||||||
klog.SetLogger(klog.NewStdLogger(log.StandardLogger().Writer()))
|
klog.SetLogger(klog.NewStdLogger(log.StandardLogger().Writer()))
|
||||||
selector.SetGlobalSelector(wrr.NewBuilder())
|
selector.SetGlobalSelector(wrr.NewBuilder())
|
||||||
if err := InitBilibiliVendors(conf.Bilibili); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
if err := InitAlistVendors(conf.Alist); err != nil {
|
|
||||||
return err
|
var backends atomic.Pointer[Backends]
|
||||||
|
|
||||||
|
type BackendConnInfo struct {
|
||||||
|
Conn *grpc.ClientConn
|
||||||
|
Info *model.VendorBackend
|
||||||
|
}
|
||||||
|
|
||||||
|
type Backends struct {
|
||||||
|
conns map[string]*BackendConnInfo
|
||||||
|
bilibili map[string]BilibiliInterface
|
||||||
|
alist map[string]AlistInterface
|
||||||
|
emby map[string]EmbyInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Backends) Conns() map[string]*BackendConnInfo {
|
||||||
|
return b.conns
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Backends) BilibiliClients() map[string]BilibiliInterface {
|
||||||
|
return b.bilibili
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Backends) AlistClients() map[string]AlistInterface {
|
||||||
|
return b.alist
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Backends) EmbyClients() map[string]EmbyInterface {
|
||||||
|
return b.emby
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBackends(ctx context.Context, conf []*model.VendorBackend) (*Backends, error) {
|
||||||
|
newConns := make(map[string]*BackendConnInfo, len(conf))
|
||||||
|
backends := &Backends{
|
||||||
|
conns: newConns,
|
||||||
|
bilibili: make(map[string]BilibiliInterface),
|
||||||
|
alist: make(map[string]AlistInterface),
|
||||||
|
emby: make(map[string]EmbyInterface),
|
||||||
|
}
|
||||||
|
for _, vb := range conf {
|
||||||
|
cc, err := NewGrpcClientConn(ctx, &vb.Backend)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if _, ok := newConns[vb.Backend.Endpoint]; ok {
|
||||||
|
return nil, fmt.Errorf("duplicate endpoint: %s", vb.Backend.Endpoint)
|
||||||
|
}
|
||||||
|
newConns[vb.Backend.Endpoint] = &BackendConnInfo{
|
||||||
|
Conn: cc,
|
||||||
|
Info: vb,
|
||||||
|
}
|
||||||
|
if vb.UsedBy.Bilibili {
|
||||||
|
if _, ok := backends.bilibili[vb.UsedBy.BilibiliBackendName]; ok {
|
||||||
|
return nil, fmt.Errorf("duplicate bilibili backend name: %s", vb.UsedBy.BilibiliBackendName)
|
||||||
|
}
|
||||||
|
cli, err := NewBilibiliGrpcClient(cc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
backends.bilibili[vb.UsedBy.BilibiliBackendName] = cli
|
||||||
|
}
|
||||||
|
if vb.UsedBy.Alist {
|
||||||
|
if _, ok := backends.alist[vb.UsedBy.AlistBackendName]; ok {
|
||||||
|
return nil, fmt.Errorf("duplicate alist backend name: %s", vb.UsedBy.AlistBackendName)
|
||||||
|
}
|
||||||
|
cli, err := NewAlistGrpcClient(cc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
backends.alist[vb.UsedBy.AlistBackendName] = cli
|
||||||
|
}
|
||||||
|
if vb.UsedBy.Emby {
|
||||||
|
if _, ok := backends.emby[vb.UsedBy.EmbyBackendName]; ok {
|
||||||
|
return nil, fmt.Errorf("duplicate emby backend name: %s", vb.UsedBy.EmbyBackendName)
|
||||||
|
}
|
||||||
|
cli, err := NewEmbyGrpcClient(cc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
backends.emby[vb.UsedBy.EmbyBackendName] = cli
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return backends, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func LoadBackends() *Backends {
|
||||||
|
return backends.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func StoreBackends(b *Backends) {
|
||||||
|
old := backends.Swap(b)
|
||||||
|
if old == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for k, conn := range old.conns {
|
||||||
|
conn.Conn.Close()
|
||||||
|
delete(old.conns, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGrpcClientConn(ctx context.Context, conf *model.Backend) (*grpc.ClientConn, error) {
|
||||||
|
if conf.Endpoint == "" {
|
||||||
|
return nil, errors.New("new grpc client failed, endpoint is empty")
|
||||||
|
}
|
||||||
|
middlewares := []middleware.Middleware{kcircuitbreaker.Client(kcircuitbreaker.WithCircuitBreaker(func() circuitbreaker.CircuitBreaker {
|
||||||
|
return sre.NewBreaker(
|
||||||
|
sre.WithRequest(25),
|
||||||
|
sre.WithWindow(time.Second*15),
|
||||||
|
)
|
||||||
|
}))}
|
||||||
|
|
||||||
|
if conf.JwtSecret != "" {
|
||||||
|
key := []byte(conf.JwtSecret)
|
||||||
|
middlewares = append(middlewares, jwt.Client(func(token *jwtv4.Token) (interface{}, error) {
|
||||||
|
return key, nil
|
||||||
|
}, jwt.WithSigningMethod(jwtv4.SigningMethodHS256)))
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []ggrpc.ClientOption{
|
||||||
|
ggrpc.WithMiddleware(middlewares...),
|
||||||
|
// ggrpc.WithOptions(grpc.WithBlock()),
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf.TimeOut != "" {
|
||||||
|
timeout, err := time.ParseDuration(conf.TimeOut)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
opts = append(opts, ggrpc.WithTimeout(timeout))
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf.Consul.ServerName != "" {
|
||||||
|
c := api.DefaultConfig()
|
||||||
|
c.Address = conf.Endpoint
|
||||||
|
c.Token = conf.Consul.Token
|
||||||
|
c.TokenFile = conf.Consul.TokenFile
|
||||||
|
c.PathPrefix = conf.Consul.PathPrefix
|
||||||
|
c.Namespace = conf.Consul.Namespace
|
||||||
|
c.Partition = conf.Consul.Partition
|
||||||
|
client, err := api.NewClient(c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
endpoint := fmt.Sprintf("discovery:///%s", conf.Consul.ServerName)
|
||||||
|
dis := consul.New(client)
|
||||||
|
opts = append(opts, ggrpc.WithEndpoint(endpoint), ggrpc.WithDiscovery(dis))
|
||||||
|
log.Infof("new grpc client with consul: %s", conf.Endpoint)
|
||||||
|
} else if conf.Etcd.ServerName != "" {
|
||||||
|
endpoint := fmt.Sprintf("discovery:///%s", conf.Etcd.ServerName)
|
||||||
|
cli, err := clientv3.New(clientv3.Config{
|
||||||
|
Endpoints: []string{conf.Endpoint},
|
||||||
|
Username: conf.Etcd.Username,
|
||||||
|
Password: conf.Etcd.Password,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dis := etcd.New(cli)
|
||||||
|
opts = append(opts, ggrpc.WithEndpoint(endpoint), ggrpc.WithDiscovery(dis))
|
||||||
|
log.Infof("new grpc client with etcd: %v", conf.Endpoint)
|
||||||
|
} else {
|
||||||
|
opts = append(opts, ggrpc.WithEndpoint(conf.Endpoint))
|
||||||
|
log.Infof("new grpc client with endpoint: %s", conf.Endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
con *grpc.ClientConn
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if conf.Tls {
|
||||||
|
var rootCAs *x509.CertPool
|
||||||
|
rootCAs, err = x509.SystemCertPool()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if conf.CustomCAFile != "" {
|
||||||
|
b, err := os.ReadFile(conf.CustomCAFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
rootCAs.AppendCertsFromPEM(b)
|
||||||
|
}
|
||||||
|
opts = append(opts, ggrpc.WithTLSConfig(&tls.Config{
|
||||||
|
RootCAs: rootCAs,
|
||||||
|
}))
|
||||||
|
|
||||||
|
con, err = ggrpc.Dial(
|
||||||
|
ctx,
|
||||||
|
opts...,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
con, err = ggrpc.DialInsecure(
|
||||||
|
ctx,
|
||||||
|
opts...,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return con, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHttpClientConn(ctx context.Context, conf *model.Backend) (*http.Client, error) {
|
||||||
|
if conf.Endpoint == "" {
|
||||||
|
return nil, errors.New("new http client failed, endpoint is empty")
|
||||||
|
}
|
||||||
|
middlewares := []middleware.Middleware{kcircuitbreaker.Client(kcircuitbreaker.WithCircuitBreaker(func() circuitbreaker.CircuitBreaker {
|
||||||
|
return sre.NewBreaker(
|
||||||
|
sre.WithRequest(25),
|
||||||
|
sre.WithWindow(time.Second*15),
|
||||||
|
)
|
||||||
|
}))}
|
||||||
|
|
||||||
|
if conf.JwtSecret != "" {
|
||||||
|
key := []byte(conf.JwtSecret)
|
||||||
|
middlewares = append(middlewares, jwt.Client(func(token *jwtv4.Token) (interface{}, error) {
|
||||||
|
return key, nil
|
||||||
|
}, jwt.WithSigningMethod(jwtv4.SigningMethodHS256)))
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []http.ClientOption{
|
||||||
|
http.WithMiddleware(middlewares...),
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf.TimeOut != "" {
|
||||||
|
timeout, err := time.ParseDuration(conf.TimeOut)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
opts = append(opts, http.WithTimeout(timeout))
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf.Tls {
|
||||||
|
rootCAs, err := x509.SystemCertPool()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if conf.CustomCAFile != "" {
|
||||||
|
b, err := os.ReadFile(conf.CustomCAFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
rootCAs.AppendCertsFromPEM(b)
|
||||||
|
}
|
||||||
|
opts = append(opts, http.WithTLSConfig(&tls.Config{
|
||||||
|
RootCAs: rootCAs,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf.Consul.ServerName != "" {
|
||||||
|
c := api.DefaultConfig()
|
||||||
|
c.Address = conf.Endpoint
|
||||||
|
c.Token = conf.Consul.Token
|
||||||
|
c.TokenFile = conf.Consul.TokenFile
|
||||||
|
c.PathPrefix = conf.Consul.PathPrefix
|
||||||
|
c.Namespace = conf.Consul.Namespace
|
||||||
|
c.Partition = conf.Consul.Partition
|
||||||
|
client, err := api.NewClient(c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
endpoint := fmt.Sprintf("discovery:///%s", conf.Consul.ServerName)
|
||||||
|
dis := consul.New(client)
|
||||||
|
opts = append(opts, http.WithEndpoint(endpoint), http.WithDiscovery(dis))
|
||||||
|
log.Infof("new http client with consul: %s", conf.Endpoint)
|
||||||
|
} else if conf.Etcd.ServerName != "" {
|
||||||
|
endpoint := fmt.Sprintf("discovery:///%s", conf.Etcd.ServerName)
|
||||||
|
cli, err := clientv3.New(clientv3.Config{
|
||||||
|
Endpoints: []string{conf.Endpoint},
|
||||||
|
Username: conf.Etcd.Username,
|
||||||
|
Password: conf.Etcd.Password,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dis := etcd.New(cli)
|
||||||
|
opts = append(opts, http.WithEndpoint(endpoint), http.WithDiscovery(dis))
|
||||||
|
log.Infof("new http client with etcd: %v", conf.Endpoint)
|
||||||
|
} else {
|
||||||
|
opts = append(opts, http.WithEndpoint(conf.Endpoint))
|
||||||
|
log.Infof("new http client with endpoint: %s", conf.Endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
con, err := http.NewClient(
|
||||||
|
ctx,
|
||||||
|
opts...,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return nil
|
return con, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue