Skip to content

Commit

Permalink
refactor: remove gorm, use pgx for postgres (#696)
Browse files Browse the repository at this point in the history
* refactor: use pgx in project repo

* refactor: migrate namespace repository

* refactor: move secret repository to pgx

* fix: fix lint and test issues

* refactor: add foreign key for tenant in resource and job

* refactor: use pgx for resource and backup

* refactor: use pgx for job repository

* refactor: change repo for scheduler bc

* refactor: move migration command to server

* refactor: use pgx remove gorm

* refactor: update tenant bounded context

* refactor: update scheduler bounded context

* refactor: update resource bounded context

* refactor: update job bounded context

* fix: fix lint errors

* fix: fix issues for field scanning

* fix: job deleted_at value is always nil issue

* fix: job repository read write issues and alter job table

* fix: add scheduler repo tests (#698)

* fix: bug in __lib.py due to mishandling response from handle_pod_overlap (#695)

* fix: add scheduler repo tests

Co-authored-by: Anwar Hidayat <[email protected]>

* fix: fix time zone to utc

* fix: test time zone failures

* refactor: simplify job spec struct

* fix: scheduler job repo test failures due to spec changes

* feat: add 000046 migration down file

* fix: inferred dependency resolution issue when duplicated job name found across projects

Co-authored-by: Arinda Arif <[email protected]>
Co-authored-by: Yash Bhardwaj <[email protected]>
Co-authored-by: Anwar Hidayat <[email protected]>
  • Loading branch information
4 people authored Jan 5, 2023
1 parent 52918d1 commit 5899045
Show file tree
Hide file tree
Showing 62 changed files with 2,768 additions and 2,149 deletions.
2 changes: 0 additions & 2 deletions client/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/odpf/optimus/client/cmd/extension"
"github.com/odpf/optimus/client/cmd/initialize"
"github.com/odpf/optimus/client/cmd/job"
"github.com/odpf/optimus/client/cmd/migration"
"github.com/odpf/optimus/client/cmd/namespace"
"github.com/odpf/optimus/client/cmd/playground"
"github.com/odpf/optimus/client/cmd/plugin"
Expand Down Expand Up @@ -64,7 +63,6 @@ func New() *cli.Command {
deploy.NewDeployCommand(),
initialize.NewInitializeCommand(),
job.NewJobCommand(),
migration.NewMigrationCommand(),
namespace.NewNamespaceCommand(),
project.NewProjectCommand(),
replay.NewReplayCommand(),
Expand Down
2 changes: 1 addition & 1 deletion config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ log:
# # database connection string
# dsn: postgres://user:password@localhost:5432/database?sslmode=disable
#
# max_idle_connection: 5
# min_open_connection: 5
# max_open_connection: 10

# optimus supports multiple scheduler types
Expand Down
2 changes: 1 addition & 1 deletion config/config_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Deployer struct {

type DBConfig struct {
DSN string `mapstructure:"dsn"` // data source name e.g.: postgres://user:password@host:123/database?sslmode=disable
MaxIdleConnection int `mapstructure:"max_idle_connection" default:"10"` // maximum allowed idle DB connections
MinOpenConnection int `mapstructure:"min_open_connection" default:"5"` // minimum open DB connections
MaxOpenConnection int `mapstructure:"max_open_connection" default:"20"` // maximum allowed open DB connections
}

Expand Down
2 changes: 1 addition & 1 deletion config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (s *ConfigTestSuite) initExpectedServerConfig() {
s.expectedServerConfig.Serve.Deployer.QueueCapacity = 10
s.expectedServerConfig.Serve.DB = config.DBConfig{}
s.expectedServerConfig.Serve.DB.DSN = "postgres://user:password@localhost:5432/database?sslmode=disable"
s.expectedServerConfig.Serve.DB.MaxIdleConnection = 5
s.expectedServerConfig.Serve.DB.MinOpenConnection = 5
s.expectedServerConfig.Serve.DB.MaxOpenConnection = 10

s.expectedServerConfig.Scheduler = config.SchedulerConfig{}
Expand Down
10 changes: 5 additions & 5 deletions core/job/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type JobService interface {
Update(ctx context.Context, jobTenant tenant.Tenant, jobs []*job.Spec) error
Delete(ctx context.Context, jobTenant tenant.Tenant, jobName job.Name, cleanFlag bool, forceFlag bool) (affectedDownstream []job.FullName, err error)
Get(ctx context.Context, jobTenant tenant.Tenant, jobName job.Name) (jobSpec *job.Job, err error)
GetTaskWithInfo(ctx context.Context, task *job.Task) (*job.Task, error)
GetTaskInfo(ctx context.Context, task job.Task) (*models.PluginInfoResponse, error)
GetByFilter(ctx context.Context, filters ...filter.FilterOpt) (jobSpecs []*job.Job, err error)
ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, jobs []*job.Spec, jobNamesWithValidationError []job.Name, logWriter writer.LogWriter) error
Refresh(ctx context.Context, projectName tenant.ProjectName, namespaceNames []string, jobNames []string, logWriter writer.LogWriter) error
Expand Down Expand Up @@ -359,15 +359,15 @@ func (jh *JobHandler) GetJobTask(ctx context.Context, req *pb.GetJobTaskRequest)
return nil, err
}

jobTask, err := jh.jobService.GetTaskWithInfo(ctx, jobResult.Spec().Task())
taskInfo, err := jh.jobService.GetTaskInfo(ctx, jobResult.Spec().Task())
if err != nil {
return nil, err
}

jobTaskSpec := &pb.JobTask{
Name: jobTask.Info().Name,
Description: jobTask.Info().Description,
Image: jobTask.Info().Image,
Name: taskInfo.Name,
Description: taskInfo.Description,
Image: taskInfo.Image,
}

jobTaskSpec.Destination = &pb.JobTask_Destination{
Expand Down
56 changes: 23 additions & 33 deletions core/job/handler/v1beta1/job_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (

func toJobProto(jobEntity *job.Job) *pb.JobSpecification {
return &pb.JobSpecification{
Version: int32(jobEntity.Spec().Version().Int()),
Version: int32(jobEntity.Spec().Version()),
Name: jobEntity.Spec().Name().String(),
Owner: jobEntity.Spec().Owner().String(),
Owner: jobEntity.Spec().Owner(),
StartDate: jobEntity.Spec().Schedule().StartDate().String(),
EndDate: jobEntity.Spec().Schedule().EndDate().String(),
Interval: jobEntity.Spec().Schedule().Interval(),
Expand Down Expand Up @@ -61,20 +61,14 @@ func fromJobProtos(protoJobSpecs []*pb.JobSpecification) ([]*job.Spec, []job.Nam
}

func fromJobProto(js *pb.JobSpecification) (*job.Spec, error) {
version, err := job.VersionFrom(int(js.Version))
if err != nil {
return nil, err
}
version := int(js.Version)

name, err := job.NameFrom(js.Name)
if err != nil {
return nil, err
}

owner, err := job.OwnerFrom(js.Owner)
if err != nil {
return nil, err
}
owner := js.Owner

startDate, err := job.ScheduleDateFrom(js.StartDate)
if err != nil {
Expand Down Expand Up @@ -121,7 +115,7 @@ func fromJobProto(js *pb.JobSpecification) (*job.Spec, error) {
return nil, err
}

var taskConfig *job.Config
var taskConfig job.Config
if js.Config != nil {
taskConfig, err = toConfig(js.Config)
if err != nil {
Expand Down Expand Up @@ -173,14 +167,14 @@ func fromJobProto(js *pb.JobSpecification) (*job.Spec, error) {
}

if js.Assets != nil {
asset, err := job.NewAsset(js.Assets)
asset, err := job.AssetFrom(js.Assets)
if err != nil {
return nil, err
}
jobSpecBuilder = jobSpecBuilder.WithAsset(asset)
}

return jobSpecBuilder.Build(), nil
return jobSpecBuilder.Build()
}

func fromResourceURNs(resourceURNs []job.ResourceURN) []string {
Expand Down Expand Up @@ -228,11 +222,11 @@ func toHooks(hooksProto []*pb.JobSpecHook) ([]*job.Hook, error) {
if err != nil {
return nil, err
}
hookName, err := job.HookNameFrom(hookProto.Name)
hookSpec, err := job.NewHook(hookProto.Name, hookConfig)
if err != nil {
return nil, err
}
hooks[i] = job.NewHook(hookName, hookConfig)
hooks[i] = hookSpec
}
return hooks, nil
}
Expand All @@ -241,26 +235,26 @@ func fromHooks(hooks []*job.Hook) []*pb.JobSpecHook {
var hooksProto []*pb.JobSpecHook
for _, hook := range hooks {
hooksProto = append(hooksProto, &pb.JobSpecHook{
Name: hook.Name().String(),
Name: hook.Name(),
Config: fromConfig(hook.Config()),
})
}
return hooksProto
}

func fromAsset(jobAsset *job.Asset) map[string]string {
func fromAsset(jobAsset job.Asset) map[string]string {
var assets map[string]string
if jobAsset != nil {
assets = jobAsset.Assets()
assets = jobAsset
}
return assets
}

func toAlerts(notifiers []*pb.JobSpecification_Behavior_Notifiers) ([]*job.AlertSpec, error) {
alerts := make([]*job.AlertSpec, len(notifiers))
for i, notify := range notifiers {
alertOn := job.EventType(utils.FromEnumProto(notify.On.String(), "type"))
config, err := job.NewConfig(notify.Config)
alertOn := utils.FromEnumProto(notify.On.String(), "type")
config, err := job.ConfigFrom(notify.Config)
if err != nil {
return nil, err
}
Expand All @@ -277,9 +271,9 @@ func fromAlerts(jobAlerts []*job.AlertSpec) []*pb.JobSpecification_Behavior_Noti
var notifiers []*pb.JobSpecification_Behavior_Notifiers
for _, alert := range jobAlerts {
notifiers = append(notifiers, &pb.JobSpecification_Behavior_Notifiers{
On: pb.JobEvent_Type(pb.JobEvent_Type_value[utils.ToEnumProto(string(alert.On()), "type")]),
On: pb.JobEvent_Type(pb.JobEvent_Type_value[utils.ToEnumProto(alert.On(), "type")]),
Channels: alert.Channels(),
Config: alert.Config().Configs(),
Config: alert.Config(),
})
}
return notifiers
Expand All @@ -295,11 +289,7 @@ func toSpecUpstreams(upstreamProtos []*pb.JobDependency) (*job.UpstreamSpec, err
continue
}
httpUpstreamProto := upstream.HttpDependency
httpUpstreamName, err := job.NameFrom(httpUpstreamProto.Name)
if err != nil {
return nil, err
}
httpUpstream, err := job.NewSpecHTTPUpstreamBuilder(httpUpstreamName, httpUpstreamProto.Url).
httpUpstream, err := job.NewSpecHTTPUpstreamBuilder(httpUpstreamProto.Name, httpUpstreamProto.Url).
WithHeaders(httpUpstreamProto.Headers).
WithParams(httpUpstreamProto.Params).
Build()
Expand All @@ -326,7 +316,7 @@ func fromSpecUpstreams(upstreams *job.UpstreamSpec) []*pb.JobDependency {
for _, httpUpstream := range upstreams.HTTPUpstreams() {
dependencies = append(dependencies, &pb.JobDependency{
HttpDependency: &pb.HttpDependency{
Name: httpUpstream.Name().String(),
Name: httpUpstream.Name(),
Url: httpUpstream.URL(),
Headers: httpUpstream.Headers(),
Params: httpUpstream.Params(),
Expand Down Expand Up @@ -399,17 +389,17 @@ func fromMetadata(metadata *job.Metadata) *pb.JobMetadata {
}
}

func toConfig(configs []*pb.JobConfigItem) (*job.Config, error) {
func toConfig(configs []*pb.JobConfigItem) (job.Config, error) {
configMap := make(map[string]string, len(configs))
for _, config := range configs {
configMap[config.Name] = config.Value
}
return job.NewConfig(configMap)
return job.ConfigFrom(configMap)
}

func fromConfig(jobConfig *job.Config) []*pb.JobConfigItem {
func fromConfig(jobConfig job.Config) []*pb.JobConfigItem {
configs := []*pb.JobConfigItem{}
for configName, configValue := range jobConfig.Configs() {
for configName, configValue := range jobConfig {
configs = append(configs, &pb.JobConfigItem{Name: configName, Value: configValue})
}
return configs
Expand Down Expand Up @@ -473,7 +463,7 @@ func toHTTPUpstreamProtos(httpUpstreamSpecs []*job.SpecHTTPUpstream) []*pb.HttpD
var httpUpstreamProtos []*pb.HttpDependency
for _, httpUpstream := range httpUpstreamSpecs {
httpUpstreamProtos = append(httpUpstreamProtos, &pb.HttpDependency{
Name: httpUpstream.Name().String(),
Name: httpUpstream.Name(),
Url: httpUpstream.URL(),
Headers: httpUpstream.Headers(),
Params: httpUpstream.Params(),
Expand Down
Loading

0 comments on commit 5899045

Please sign in to comment.