场景

Kubernetes集群中,节点的内存使用情况至关重要。
内存使用过高可能导致Pod无法正常运行,从而影响整个集群的性能。
因此,实时监控节点的内存使用情况,并根据需要动态调整节点的调度策略,是保障集群稳定性的关键。


正文

程序概述

本文将使用一个简单的Go语言程序,并设置每10分钟监控一次Kubernetes集群中所有节点的内存使用情况。当某个节点的内存使用率达到90%时,程序会自动停止该节点的调度,待内存使用率降低后再恢复调度。同时,程序还会通过钉钉和邮件通知相关人员,以确保大家及时了解集群的状态。

程序功能

  1. 定时监控:每10分钟自动检查集群中所有节点的内存使用情况。
  2. 动态调度控制:当节点内存使用率超过90%时,自动停止该节点的调度;内存使用降低后,再次恢复调度。
  3. 通知机制:一旦节点状态发生变化,程序会通过钉钉和邮件发送通知,确保相关人员及时获得信息。

程序代码

package main

// 引入必要的包
import (
	"context"       // 用于创建上下文环境,常在API请求中使用
	"encoding/json" // 提供JSON的编码和解码功能
	"fmt"           // 实现格式化的I/O操作
	"log"
	"math" // 提供基本的数学函数
	"os"   // 提供系统级的文件操作和环境变量访问

	// 用于对文件路径进行操作
	"strconv" // 提供字符串与其他基本类型之间的转换功能
	"strings" // 提供简单的字符串操作功能
	"time"

	"github.com/wanghuiyt/ding" // 第三方包,用于发送钉钉消息
	mail "github.com/xhit/go-simple-mail/v2"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" // Kubernetes的元数据API
	"k8s.io/client-go/kubernetes"                 // Kubernetes的客户端库,用于与集群交互
	"k8s.io/client-go/tools/clientcmd"            // 用于处理Kubernetes配置文件

	// 用于获取用户的主目录路径
	"k8s.io/metrics/pkg/client/clientset/versioned" // Kubernetes的度量指标API
)

// 定义全局常量
const statusFilePath = "node_status.json" // 节点状态信息保存文件的路径
const Threshold = 5                       // 内存使用率的变化阈值设置为5%

// NodeStatus 结构体定义,用于保存节点的状态信息
type NodeStatus struct {
	MemoryUsagePercentage float64 // 节点内存使用率百分比
	Unschedulable         bool    // 节点是否停止调度的标志
}

func timeNow() string {
	// 获取当前时间。
	now := time.Now()
	// 根据这个模板,可以构造各种时间格式。
	formattedTime := now.Format("2006-01-02 15:04:05")
	// 输出格式化的时间,例如 "Formatted time: 2023-03-10 15:04:05"。
	return formattedTime
}

// 准备邮件内容,包括加粗和换行
func prepareHtmlMessage(originalMessage string) string {
	// 使用HTML标签构建消息内容
	// 在消息前添加"您好!",后面跟上"产生以下告警:",然后是加粗的消息内容,最后是"请关注!"
	htmlMessage := "您好!<br>产生以下告警:<br><b>" + originalMessage + "</b><br>请关注!"
	return htmlMessage
}

// dingSend 函数用于发送钉钉消息
func dingSend(message string) error {
	// 获取当前时间字符串
	currentTime := timeNow()
	// 将时间戳拼接到消息内容中
	dingmessage := fmt.Sprintf("时间:%s\n信息:\n%s", currentTime, message)
	// 初始化钉钉Webhook配置
	d := ding.Webhook{
		AccessToken: "钉钉AccessToken",    // 钉钉AccessToken
		Secret:      "钉钉Secret", // 钉钉Secret
	}
	// 调用SendMessageText方法发送文本消息
	err := d.SendMessageText(dingmessage)
	if err != nil {
		panic(err.Error()) // 发生错误时抛出异常
	}
	return nil
}

func mailSend(message string) error {
	// 配置SMTP服务器
	server := mail.NewSMTPClient()
	server.Host = "smtp.mxhichina.com"       // 设置SMTP服务器地址,我这里是阿里企业邮箱
	server.Port = 80                         // 设置SMTP服务器端口
	server.Username = "SMTP用户名"   // 设置发件人SMTP用户名(发件人邮箱地址)
	server.Password = "SMTP密码"      // 设置发件人SMTP密码(发件人邮箱密码)
	server.KeepAlive = false                 // 设置是否保持长连接
	server.ConnectTimeout = 10 * time.Second // 设置连接超时时间
	server.SendTimeout = 10 * time.Second    // 设置发送超时时间

	// 连接到SMTP服务器
	smtpClient, err := server.Connect()
	if err != nil {
		log.Fatal(err) // 如果连接失败,则记录错误并退出
	}

	// 创建邮件消息
	email := mail.NewMSG()
	subject := fmt.Sprintf("告警-K8S集群内存告警(%s)", timeNow())
	email.SetFrom("发件人邮箱地址"). // 设置发件人
						AddTo("收件人1邮箱地址").     // 添加收件人1
						AddCc("抄送人2邮箱地址"). // 添加抄送人2
						SetSubject(subject).              // 设置邮件主题
						SetListUnsubscribe(" ")           // 设置邮件退订链接,这里为空

	// 调用prepareHtmlMessage来格式化原始消息
	prepareHtmlMessage := prepareHtmlMessage(message)
	htmlMessage := strings.ReplaceAll(prepareHtmlMessage, "\n", "<br>") // 将换行符替换为HTML的换行标签
	if htmlMessage != "" {
		email.SetBody(mail.TextHTML, htmlMessage) // 设置邮件正文,使用HTML格式
	}

	// 设置DSN(Delivery Status Notification,投递状态通知)
	email.SetDSN([]mail.DSN{mail.SUCCESS, mail.FAILURE}, false)

	// 检查构建邮件过程中是否有错误
	if email.Error != nil {
		log.Fatal(email.Error) // 如果有错误,则记录并退出
	}

	// 发送邮件
	err = email.Send(smtpClient)
	if err != nil {
		log.Println(err) // 如果发送失败,记录错误
	} else {
		log.Println("邮件已发送") // 发送成功,记录提示信息
	}
	return nil
}

// readNodeStatusFromFile 函数从指定的文件中读取节点的状态信息
func readNodeStatusFromFile(filePath string) (map[string]NodeStatus, error) {
	// 打开文件进行读取操作
	file, err := os.Open(filePath)
	if err != nil {
		// 文件不存在时,返回空映射和nil错误
		if os.IsNotExist(err) {
			return make(map[string]NodeStatus), nil
		}
		// 其他错误时,返回nil映射和错误信息
		return nil, err
	}
	defer file.Close() // 确保在函数退出时关闭文件

	var status map[string]NodeStatus // 定义用于存储节点状态的映射
	// 使用JSON解码器从文件读取节点状态
	err = json.NewDecoder(file).Decode(&status)
	if err != nil {
		// 读取错误时,返回nil映射和错误信息
		return nil, err
	}
	return status, nil // 正常情况下返回节点状态和nil错误
}

// writeNodeStatusToFile 函数将节点的状态信息写入指定的文件
func writeNodeStatusToFile(filePath string, status map[string]NodeStatus) error {
	// 创建或覆盖文件进行写入操作
	file, err := os.Create(filePath)
	if err != nil {
		return err // 文件创建错误时,返回错误信息
	}
	defer file.Close() // 确保在函数退出时关闭文件

	// 创建JSON编码器并设置格式化缩进
	encoder := json.NewEncoder(file)
	encoder.SetIndent("", "  ")
	// 将节点状态编码为JSON格式写入文件
	return encoder.Encode(status)
}

func main() {
	// 获取Kubernetes配置文件的路径
	//kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
	// 定义Kubeconfig文件的绝对路径
	kubeconfig := "/root/.kube/config"

	// 基于配置文件建立Kubernetes客户端配置
	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
	if err != nil {
		panic(err.Error()) // 配置构建错误时抛出异常
	}

	// 创建Kubernetes客户端
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error()) // 客户端创建错误时抛出异常
	}

	// 创建Kubernetes度量指标客户端
	metricsClientset, err := versioned.NewForConfig(config)
	if err != nil {
		panic(err.Error()) // 度量指标客户端创建错误时抛出异常
	}

	// 获取Kubernetes集群中所有节点的列表
	nodes, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
	if err != nil {
		panic(err.Error()) // 节点列表获取错误时抛出异常
	}

	// 从文件中读取之前保存的节点状态
	previousStatus, err := readNodeStatusFromFile(statusFilePath)
	if err != nil {
		panic(err.Error()) // 节点状态读取错误时抛出异常
	}

	// 创建map来存储当前的节点状态
	currentStatus := make(map[string]NodeStatus)
	var message strings.Builder // 用于构建通知消息的字符串构建器

	// 遍历从Kubernetes集群获取的所有节点
	for _, node := range nodes.Items {
		// 检查节点名称是否包含"master",如果是则跳过此节点
		// 通常,主节点(master nodes)不用于调度普通工作负载
		if strings.Contains(node.Name, "master") {
			continue
		}

		// 获取节点的度量指标(例如,内存和CPU使用情况)
		metrics, err := metricsClientset.MetricsV1beta1().NodeMetricses().Get(context.Background(), node.Name, metav1.GetOptions{})
		if err != nil {
			// 如果获取度量指标时出错,则将错误信息添加到消息字符串,并继续下一个节点
			message.WriteString(fmt.Sprintf("查询节点信息发生错误 %s: %v\n", node.Name, err))
			continue
		}

		// 计算节点当前的内存使用量和总内存容量
		currentMemoryUsage := metrics.Usage.Memory().Value()         // 当前内存使用量
		totalMemoryCapacity := node.Status.Capacity.Memory().Value() // 总内存容量

		// 计算内存使用率并四舍五入到两位小数
		memoryUsagePercentage := math.Round((float64(currentMemoryUsage)/float64(totalMemoryCapacity)*100)*100) / 100
		formattedPercentage := fmt.Sprintf("%.0f", memoryUsagePercentage) // 格式化为百分比

		// 将格式化后的百分比转换回浮点数
		percentage, err := strconv.ParseFloat(formattedPercentage, 64)
		if err != nil {
			panic(err.Error()) // 转换出错时抛出异常
		}

		// 获取之前保存的节点状态,检查是否存在
		prevNodeStatus, exists := previousStatus[node.Name]
		// 判断节点是否需要设置为不可调度(当内存使用率超过90%时)
		unschedulable := percentage >= 90
		// 判断是否需要更新节点状态:当节点是新的或其调度状态发生变化时
		shouldUpdate := !exists || prevNodeStatus.Unschedulable != unschedulable

		// 如果需要更新节点状态
		if shouldUpdate {
			// 更新节点的调度状态
			node.Spec.Unschedulable = unschedulable
			// 向Kubernetes API发送更新请求
			_, err = clientset.CoreV1().Nodes().Update(context.Background(), &node, metav1.UpdateOptions{})
			if err != nil {
				panic(err.Error()) // 更新失败时抛出异常
			}

			// 构造消息字符串,包含节点名称、内存使用率和调度状态
			status := "已启用调度"
			if unschedulable {
				status = "已停止调度"
			}
			message.WriteString(fmt.Sprintf("节点名称: %s,内存使用率: %.0f%%,%s\n", node.Name, percentage, status))
		}

		// 更新当前节点状态的映射
		currentStatus[node.Name] = NodeStatus{
			MemoryUsagePercentage: percentage,
			Unschedulable:         unschedulable,
		}
	}

	// 如果有节点状态发生变化,则发送消息
	if message.Len() > 0 {
		fmt.Print(message.String()) // 在控制台打印消息内容

		// 发送钉钉消息
		errDingSend := dingSend(message.String())
		if errDingSend != nil {
			fmt.Printf("钉钉消息发送失败: %v\n", errDingSend)
		}

		// 发送邮件信息
		errMailSend := mailSend(message.String())
		if errMailSend != nil {
			fmt.Printf("邮件发送失败: %v\n", errMailSend)
		}
	}

	// 将当前的节点状态写入文件
	if err := writeNodeStatusToFile(statusFilePath, currentStatus); err != nil {
		panic(err.Error()) // 节点状态写入错误时抛出异常
	}
}

如何应用

设置为系统启动项,10分钟运行一次

vim /usr/lib/systemd/system/node-monitor.service
[Install]
WantedBy=multi-user.target

[Unit]
Description=Node Monitor Service
After=network.target

[Service]
User=root
WorkingDirectory=/data/go
ExecStart=/data/go/Node_Monitor
# 总是自动重启,执行完等待10分钟重启一次
Restart=always
RestartSec=600

[Install]
WantedBy=multi-user.target

systemctl daemon-reload
systemctl start node-monitor.service

效果展示

image.png

image.png

image.png


总结

通过这个程序,能够有效监控Kubernetes集群的节点内存使用情况,并根据实时数据动态调整调度策略,从而提升集群的稳定性和可用性。定期的内存监控和及时的调度调整,对于保障生产环境的正常运行至关重要。


文章作者: Runfa Li
本文链接:
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Linux 小白鼠
GO Linux Kubernetes Monitor go Linux kubernetes node 监控 容器监控 状态监控 monitor
觉得文章不错,打赏一点吧,1分也是爱~
打赏
微信 微信
支付宝 支付宝