Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/backend/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type CacheIndexer struct {
stopChans chan struct{}
Pod kcache.Indexer
Event kcache.Indexer
Endpoints kcache.Indexer
}

func (c ClusterManager) Close() {
Expand Down Expand Up @@ -148,9 +149,14 @@ func buildCacheController(client *kubernetes.Clientset) *CacheIndexer {
eventIndexer, eventInformer := kcache.NewIndexerInformer(eventListWatcher, &v1.Event{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go eventInformer.Run(stopCh)

// create the endpoint watcher
endpointsListWatcher := kcache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", v1.NamespaceAll, fields.Everything())
endpointsIndexer, endpointsinformer := kcache.NewIndexerInformer(endpointsListWatcher, &v1.Endpoints{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go endpointsinformer.Run(stopCh)
return &CacheIndexer{
Pod: podIndexer,
Event: eventIndexer,
Endpoints: endpointsIndexer,
stopChans: stopCh,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/backend/controllers/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (c *IngressController) Prepare() {
perAction = models.PermissionDelete
}
if perAction != "" {
c.CheckPermission(models.PermissionTypeStatefulset, perAction)
c.CheckPermission(models.PermissionTypeIngress, perAction)
}
}

Expand Down Expand Up @@ -74,7 +74,7 @@ func (c *IngressController) List() {
param.Query["App__Id"] = c.AppId
} else if !c.User.Admin {
param.Query["App__AppUsers__User__Id__exact"] = c.User.Id
perName := models.PermissionModel.MergeName(models.PerMissionTypeIngress, models.PermissionRead)
perName := models.PermissionModel.MergeName(models.PermissionTypeIngress, models.PermissionRead)
param.Query["App__AppUsers__Group__Permissions__Permission__Name__contains"] = perName
param.Groupby = []string{"Id"}
}
Expand Down
58 changes: 55 additions & 3 deletions src/backend/controllers/kubernetes/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/Qihoo360/wayne/src/backend/resources/service"
"github.com/Qihoo360/wayne/src/backend/util/logs"
"github.com/Qihoo360/wayne/src/backend/workers/webhook"
kapi "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
)

type KubeServiceController struct {
Expand All @@ -21,6 +21,8 @@ func (c *KubeServiceController) URLMapping() {
c.Mapping("Get", c.Get)
c.Mapping("Offline", c.Offline)
c.Mapping("Deploy", c.Deploy)
c.Mapping("GetDetail", c.GetDetail)
c.Mapping("List", c.List)
}

func (c *KubeServiceController) Prepare() {
Expand All @@ -42,6 +44,56 @@ func (c *KubeServiceController) Prepare() {
}
}

// @Title GetDetail
// @Description find Deployment by cluster
// @Param cluster path string true "the cluster name"
// @Param namespace path string true "the namespace name"
// @Success 200 {object} service.ServiceDetail success
// @router /:service/detail/namespaces/:namespace/clusters/:cluster [get]
func (c *KubeServiceController) GetDetail() {
cluster := c.Ctx.Input.Param(":cluster")
namespace := c.Ctx.Input.Param(":namespace")
name := c.Ctx.Input.Param(":service")
manager, err := client.Manager(cluster)
if err != nil {
c.AbortBadRequestFormat("Cluster")
}
serviceDetail, err := service.GetServiceDetail(manager.Client, manager.Indexer, namespace, name)
if err != nil {
logs.Error("get kubernetes(%s) namespace(%s) service(%s) detail error: %s", cluster, namespace, name, err.Error())
c.AbortInternalServerError("get kubernetes service detail error.")
}
c.Success(serviceDetail)
}

// @Title List service
// @Description get all ingress in a kubernetes cluster
// @Param pageNo query int false "the page current no"
// @Param pageSize query int false "the page size"
// @Param filter query string false "column filter, ex. filter=name=test"
// @Param sortby query string false "column sorted by, ex. sortby=-id, '-' representation desc, and sortby=id representation asc"
// @Param cluster path string true "the cluster name"
// @Param namespace path string true "the namespace name"
// @Success 200 {object} common.Page success
// @router /namespaces/:namespace/clusters/:cluster [get]
func (c *KubeServiceController) List() {
param := c.BuildQueryParam()
cluster := c.Ctx.Input.Param(":cluster")
namespace := c.Ctx.Input.Param(":namespace")

manager, err := client.Manager(cluster)
if err != nil {
c.AbortBadRequestFormat("Cluster")
}
res, err := service.GetServicePage(manager.Client, namespace, param)
if err != nil {
logs.Error("list kubernetes(%s) namespace(%s) services error %v", cluster, namespace, err)
c.HandleError(err)
return
}
c.Success(res)
}

// @Title deploy
// @Description deploy tpl
// @Param body body string true "The tpl content"
Expand All @@ -50,7 +102,7 @@ func (c *KubeServiceController) Prepare() {
func (c *KubeServiceController) Deploy() {
serviceId := c.GetIntParamFromURL(":serviceId")
tplId := c.GetIntParamFromURL(":tplId")
var kubeService kapi.Service
var kubeService v1.Service
err := json.Unmarshal(c.Ctx.Input.RequestBody, &kubeService)
if err != nil {
logs.Error("Invalid service tpl %v", string(c.Ctx.Input.RequestBody))
Expand Down Expand Up @@ -121,7 +173,7 @@ func (c *KubeServiceController) Get() {
name := c.Ctx.Input.Param(":service")
cli, err := client.Client(cluster)
if err == nil {
result, err := service.GetServiceDetail(cli, name, namespace)
result, err := service.GetService(cli, name, namespace)
if err != nil {
logs.Error("get kubernetes service detail error.", cluster, namespace, name, err)
c.HandleError(err)
Expand Down
4 changes: 2 additions & 2 deletions src/backend/models/permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
PermissionTypeDaemonSet = "DAEMONSET"
PermissionBill = "BILL"
PermissionTypeAPIKey = "APIKEY"
PerMissionTypeIngress = "INGRESS"
PermissionTypeIngress = "INGRESS"
PermissionBlank = "_"
)

Expand Down Expand Up @@ -157,7 +157,7 @@ func (*permissionModel) GetPermissionTypeByPublishType(pType PublishType) (perTy
case PublishTypeCronJob:
perType = PermissionTypeCronjob
case PublishTypeIngress:
perType = PerMissionTypeIngress
perType = PermissionTypeIngress
}
return perType
}
78 changes: 78 additions & 0 deletions src/backend/resources/common/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2017 The Kubernetes Authors.
Comment thread
HZ89 marked this conversation as resolved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"fmt"

api "k8s.io/api/core/v1"
)

// Endpoint describes an endpoint that is host and a list of available ports for that host.
type Endpoint struct {
// Hostname, either as a domain name or IP address.
Host string `json:"host"`

// List of ports opened for this endpoint on the hostname.
Ports []ServicePort `json:"ports"`
}

// GetExternalEndpoints returns endpoints that are externally reachable for a service.
func GetExternalEndpoints(service *api.Service) []Endpoint {
Comment thread
wilhelmguo marked this conversation as resolved.
var externalEndpoints []Endpoint
if service.Spec.Type == api.ServiceTypeLoadBalancer {
for _, ingress := range service.Status.LoadBalancer.Ingress {
externalEndpoints = append(externalEndpoints, getExternalEndpoint(ingress, service.Spec.Ports))
}
}

for _, ip := range service.Spec.ExternalIPs {
externalEndpoints = append(externalEndpoints, Endpoint{
Host: ip,
Ports: GetServicePorts(service.Spec.Ports),
})
}

return externalEndpoints
}

// GetInternalEndpoint returns internal endpoint name for the given service properties, e.g.,
// "my-service.namespace 80/TCP" or "my-service 53/TCP,53/UDP".
func GetInternalEndpoint(serviceName, namespace string, ports []api.ServicePort) Endpoint {
name := serviceName

if namespace != api.NamespaceDefault && len(namespace) > 0 && len(serviceName) > 0 {
name = fmt.Sprintf("%s.%s", name, namespace)
}

return Endpoint{
Host: name,
Ports: GetServicePorts(ports),
}
}

// Returns external endpoint name for the given service properties.
func getExternalEndpoint(ingress api.LoadBalancerIngress, ports []api.ServicePort) Endpoint {
var host string
if ingress.Hostname != "" {
host = ingress.Hostname
} else {
host = ingress.IP
}
return Endpoint{
Host: host,
Ports: GetServicePorts(ports),
}
}
38 changes: 38 additions & 0 deletions src/backend/resources/common/serviceport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2017 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import api "k8s.io/api/core/v1"

// ServicePort is a pair of port and protocol, e.g. a service endpoint.
type ServicePort struct {
// Positive port number.
Port int32 `json:"port"`

// Protocol name, e.g., TCP or UDP.
Protocol api.Protocol `json:"protocol"`

// The port on each node on which service is exposed.
NodePort int32 `json:"nodePort"`
}

// GetServicePorts returns human readable name for the given service ports list.
func GetServicePorts(apiPorts []api.ServicePort) []ServicePort {
var ports []ServicePort
for _, port := range apiPorts {
ports = append(ports, ServicePort{port.Port, port.Protocol, port.NodePort})
}
return ports
}
7 changes: 7 additions & 0 deletions src/backend/resources/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,10 @@ func NewObjectMeta(k8SObjectMeta metaV1.ObjectMeta) ObjectMeta {
Annotations: k8SObjectMeta.Annotations,
}
}

// NewTypeMeta creates new type mete for the resource kind.
func NewTypeMeta(kind ResourceKind) TypeMeta {
return TypeMeta{
Kind: kind,
}
}
74 changes: 74 additions & 0 deletions src/backend/resources/endpoint/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package endpoint

import (
"github.com/Qihoo360/wayne/src/backend/client"
"github.com/Qihoo360/wayne/src/backend/resources/common"

Comment thread
chengyumeng marked this conversation as resolved.
"k8s.io/api/core/v1"
)

type Endpoint struct {
ObjectMeta common.ObjectMeta `json:"objectMeta"`
TypeMeta common.TypeMeta `json:"typeMeta"`

// Hostname, either as a domain name or IP address.
Host string `json:"host"`

// Name of the node the endpoint is located
NodeName *string `json:"nodeName"`

// Status of the endpoint
Ready bool `json:"ready"`

// Array of endpoint ports
Ports []v1.EndpointPort `json:"ports"`
}

func GetServiceEndpointsFromCache(cache *client.CacheIndexer, namespace, name string) (list []Endpoint) {
return toEndpointList(GetEndpointsFromCache(cache, namespace, name))
}

// GetEndpoints gets endpoints associated to resource with given name.
func GetEndpointsFromCache(cache *client.CacheIndexer, namespace, name string) (endpointsList []v1.Endpoints) {
allEndpoints := cache.Endpoints.List()

for _, v := range allEndpoints {
endpoints, ok := v.(*v1.Endpoints)
if !ok {
continue
}
if endpoints.Namespace != namespace {
continue
}
if endpoints.Name != name {
continue
}
endpointsList = append(endpointsList, *endpoints)
}

return
}

func toEndpointList(endpoints []v1.Endpoints) (list []Endpoint) {
for _, endpoint := range endpoints {
for _, subSets := range endpoint.Subsets {
for _, address := range subSets.Addresses {
list = append(list, *toEndpoint(address, subSets.Ports, true))
}
for _, notReadyAddress := range subSets.NotReadyAddresses {
list = append(list, *toEndpoint(notReadyAddress, subSets.Ports, false))
}
}
}
return
}

func toEndpoint(address v1.EndpointAddress, ports []v1.EndpointPort, ready bool) *Endpoint {
return &Endpoint{
TypeMeta: common.NewTypeMeta(common.ResourceKind("endpoint")),
Host: address.IP,
Ports: ports,
Ready: ready,
NodeName: address.NodeName,
}
}
27 changes: 27 additions & 0 deletions src/backend/resources/service/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package service

import "github.com/Qihoo360/wayne/src/backend/resources/dataselector"

type ServiceCell Service

func (self ServiceCell) GetProperty(name dataselector.PropertyName) dataselector.ComparableValue {
switch name {
case dataselector.NameProperty:
return dataselector.StdComparableString(self.ObjectMeta.Name)
case dataselector.CreationTimestampProperty:
return dataselector.StdComparableTime(self.ObjectMeta.CreationTimestamp.Time)
case dataselector.NamespaceProperty:
return dataselector.StdComparableString(self.ObjectMeta.Namespace)
default:
// if name is not supported then just return a constant dummy value, sort will have no effect.
return nil
}
}

func toCell(std []Service) []dataselector.DataCell {
cells := make([]dataselector.DataCell, len(std))
for i := range std {
cells[i] = ServiceCell(std[i])
}
return cells
}
Loading