大数据管理与监控:Cloudera Manager:Kafka消息队列管理_第1页
大数据管理与监控:Cloudera Manager:Kafka消息队列管理_第2页
大数据管理与监控:Cloudera Manager:Kafka消息队列管理_第3页
大数据管理与监控:Cloudera Manager:Kafka消息队列管理_第4页
大数据管理与监控:Cloudera Manager:Kafka消息队列管理_第5页
已阅读5页,还剩12页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

大数据管理与监控:ClouderaManager:Kafka消息队列管理1大数据基础概念1.1大数据的定义与特征大数据是指无法在合理时间内用传统数据处理工具进行捕捉、管理和处理的数据集合。其特征通常被概括为“4V”:Volume(大量):数据量巨大,可能达到PB甚至EB级别。Velocity(高速):数据生成和处理速度极快,需要实时或近实时的处理能力。Variety(多样):数据类型多样,包括结构化、半结构化和非结构化数据。Value(价值):从海量数据中提取有价值的信息,但价值密度可能较低。1.2大数据处理流程大数据处理流程主要包括以下几个关键步骤:数据采集:从各种来源收集数据,如传感器、社交媒体、日志文件等。数据存储:使用分布式文件系统或数据库存储大量数据。数据处理:对数据进行清洗、转换和分析,可能使用批处理或流处理技术。数据分析:应用统计学、机器学习等方法,从数据中发现模式和趋势。数据可视化:将分析结果以图表或报告形式展示,便于理解和决策。1.3大数据技术栈简介大数据技术栈包括多种工具和技术,用于处理大数据的各个方面:Hadoop:一个开源框架,用于分布式存储和处理大数据集。它包括HDFS(分布式文件系统)和MapReduce(分布式计算模型)。Spark:一个快速、通用的集群计算系统,支持批处理、流处理和机器学习。Hive:一个数据仓库工具,用于查询和管理Hadoop中的结构化数据。Pig:一个用于数据分析的高级语言,简化了HadoopMapReduce的使用。Kafka:一个分布式流处理平台,用于构建实时数据管道和流应用。Zookeeper:一个分布式协调服务,用于管理和协调分布式应用中的服务。ClouderaManager:一个用于部署、管理、监控和维护Hadoop集群的工具。1.3.1示例:使用Spark进行数据处理#导入Spark相关库

frompysparkimportSparkConf,SparkContext

#初始化Spark环境

conf=SparkConf().setAppName("BigDataExample").setMaster("local")

sc=SparkContext(conf=conf)

#读取数据

data=sc.textFile("hdfs://localhost:9000/user/hadoop/data.txt")

#数据处理:计算单词频率

word_counts=data.flatMap(lambdaline:line.split(""))\

.map(lambdaword:(word,1))\

.reduceByKey(lambdaa,b:a+b)

#输出结果

word_counts.saveAsTextFile("hdfs://localhost:9000/user/hadoop/output")

#停止Spark环境

sc.stop()在这个例子中,我们使用Spark从HDFS中读取数据,然后进行单词频率的计算,并将结果保存回HDFS。这展示了大数据处理中常见的数据读取、处理和存储流程。1.3.2示例:Kafka消息队列管理#导入Kafka相关库

fromkafkaimportKafkaProducer

#初始化Kafka生产者

producer=KafkaProducer(bootstrap_servers='localhost:9092')

#发送消息到Kafka主题

producer.send('my-topic',b'some_message_bytes')

#确保所有消息被发送

producer.flush()

#关闭生产者

producer.close()此代码示例展示了如何使用Python的Kafka库向Kafka主题发送消息。Kafka作为一个消息队列,可以高效地处理和传输大量数据,是大数据实时处理的重要组成部分。通过以上介绍,我们了解了大数据的基本概念、处理流程以及常用的技术栈。这些知识为深入学习大数据管理和监控提供了坚实的基础。2Kafka消息队列概述2.1Kafka简介与工作原理Kafka是由Apache软件基金会开发的一个开源流处理平台,最初由LinkedIn公司创建并开源。它以一种高吞吐量、分布式、持久化的方式处理实时数据流,被广泛应用于日志收集、消息传递、流分析、网站活动跟踪等场景。Kafka的设计灵感来源于传统的消息队列,但其架构更接近于分布式文件系统,这使得Kafka能够提供比传统消息队列更高的数据吞吐量和更低的延迟。2.1.1工作原理Kafka的核心组件包括:生产者(Producer):负责发布消息到Kafka的Topic。消费者(Consumer):负责从Kafka的Topic中订阅并消费消息。Broker:Kafka集群中的服务器,负责处理Topic的读写请求。Topic:消息分类的逻辑容器,可以理解为一个队列。分区(Partition):每个Topic可以被分成多个分区,以实现数据的并行处理和高可用性。副本(Replica):为了提高数据的可靠性和可用性,Kafka会为每个分区创建多个副本,其中一个是Leader,其他为Follower。Kafka的生产者将消息发送到特定的Topic,消息会被追加到Topic的分区中。消费者通过订阅特定的Topic来消费消息,Kafka支持多个消费者并行消费,每个消费者可以独立地从Topic中读取消息。Kafka的Broker负责管理Topic的分区和副本,确保数据的持久性和一致性。2.1.2示例代码#生产者示例代码

fromkafkaimportKafkaProducer

producer=KafkaProducer(bootstrap_servers='localhost:9092')

producer.send('my-topic',b'some_message_bytes')

producer.flush()

producer.close()

#消费者示例代码

fromkafkaimportKafkaConsumer

consumer=KafkaConsumer('my-topic',

group_id='my-group',

bootstrap_servers='localhost:9092')

formessageinconsumer:

print("%s:%d:%d:key=%svalue=%s"%(message.topic,message.partition,

message.offset,message.key,

message.value))2.2Kafka在大数据生态系统中的角色Kafka在大数据生态系统中扮演着关键的角色,它作为数据流的枢纽,连接着数据的产生者和消费者。在Hadoop、Spark、Flink等大数据处理框架中,Kafka被用作数据的实时传输和存储层,使得这些框架能够处理实时数据流,而不仅仅是批处理数据。Kafka与ClouderaManager的结合,使得Kafka的部署、管理和监控变得更加简单和高效。ClouderaManager提供了Kafka的配置管理、服务监控、性能调优等功能,使得大数据工程师能够更加专注于数据处理逻辑,而不是基础设施的维护。2.3Kafka的关键特性和优势2.3.1关键特性高吞吐量:Kafka能够处理每秒数百万条消息的读写操作,这得益于其基于磁盘的存储机制和高效的文件系统操作。持久性:Kafka将消息存储在磁盘上,提供了数据的持久化,即使在Broker宕机的情况下,数据也不会丢失。可扩展性:Kafka支持水平扩展,可以通过增加Broker的数量来提高系统的处理能力。容错性:Kafka的分区和副本机制确保了数据的高可用性和容错性,即使部分Broker失效,数据仍然可以被访问和处理。低延迟:Kafka提供了低延迟的消息传递,使得实时数据处理成为可能。2.3.2优势实时处理:Kafka能够实时地处理和传输数据,这对于实时分析和监控系统至关重要。数据集成:Kafka可以作为数据集成的中心,连接各种数据源和数据处理系统,实现数据的统一管理和传输。消息重放:Kafka支持消息的重放,这对于数据的回溯分析和故障恢复非常有用。灵活的订阅模式:Kafka支持多种订阅模式,包括独占订阅、广播订阅等,使得消费者可以根据需要选择不同的订阅方式。社区支持:Kafka拥有活跃的开源社区,提供了丰富的文档、工具和插件,使得Kafka的使用和维护变得更加容易。通过以上介绍,我们可以看到Kafka在大数据生态系统中的重要地位,以及其在消息传递、数据集成、实时处理等方面的关键特性和优势。在实际应用中,合理利用Kafka的特性,可以极大地提高数据处理的效率和可靠性。3ClouderaManager简介3.1ClouderaManager的功能与优势ClouderaManager是一个全面的管理平台,用于部署、管理、监控和维护Hadoop集群。它提供了以下关键功能和优势:简化部署:通过图形界面或命令行工具,简化Hadoop及相关组件的部署过程。集中管理:提供一个统一的界面来管理Hadoop集群,包括配置、启动、停止服务等。监控与警报:实时监控集群的健康状况,提供性能指标和警报,帮助快速识别和解决问题。安全与合规:支持Kerberos、LDAP/AD等安全协议,确保数据安全和符合企业合规要求。升级与维护:简化Hadoop组件的升级过程,提供自动备份和恢复功能,减少维护工作量。3.2使用ClouderaManager管理Hadoop集群ClouderaManager通过其直观的用户界面和强大的API,使管理Hadoop集群变得简单。以下是如何使用ClouderaManager进行基本的集群管理操作:3.2.1部署Hadoop集群创建集群:在ClouderaManager中,选择“创建集群”,指定集群名称和版本。添加主机:从主机列表中选择要加入集群的主机。配置服务:选择要部署的服务,如HDFS、YARN、HBase等,并配置其参数。安装服务:完成配置后,启动安装过程,ClouderaManager将自动部署和配置服务。3.2.2监控集群性能查看仪表板:仪表板提供集群的总体视图,包括CPU使用率、内存使用率、磁盘使用率等关键指标。服务监控:每个服务都有详细的监控页面,显示其健康状态和性能指标。设置警报:可以为关键指标设置警报,当指标超出预设范围时,ClouderaManager会发送通知。3.2.3升级Hadoop组件选择版本:在ClouderaManager中,选择要升级到的Hadoop版本。备份配置:在升级前,ClouderaManager会自动备份当前的配置。执行升级:启动升级过程,ClouderaManager将自动升级所有相关组件。验证升级:升级完成后,检查集群状态和性能,确保一切正常。3.3ClouderaManager的安装与配置3.3.1安装ClouderaManager下载安装包:从Cloudera官方网站下载ClouderaManager的安装包。安装服务器:在一台主机上运行安装脚本,安装ClouderaManagerServer。安装代理:在所有集群主机上安装ClouderaManagerAgent。#安装ClouderaManagerServer

sudoshcloudera-manager-installer.bin

#安装ClouderaManagerAgent

sudoshcloudera-manager-agent-installer.bin3.3.2配置ClouderaManager配置数据库:ClouderaManager需要一个数据库来存储配置和监控数据,可以使用内置的SQLite数据库或外部的MySQL、PostgreSQL等。配置网络:确保所有主机可以访问ClouderaManagerServer,配置防火墙和网络设置。配置主机:添加集群中的主机,配置主机的网络地址和操作系统信息。#配置外部数据库

sudo/etc/init.d/cloudera-scm-server-dbstart

#配置ClouderaManagerServer

sudo/etc/init.d/cloudera-scm-serverstart通过以上步骤,可以有效地使用ClouderaManager来管理Hadoop集群,提高大数据平台的稳定性和效率。4在ClouderaManager中部署Kafka4.1Kafka服务的配置与安装4.1.1环境准备在开始部署Kafka之前,确保你的ClouderaManager环境已经安装并配置好。这包括:ClouderaManagerServer:运行在一台主机上,用于管理整个集群。ClouderaManagerAgent:运行在每台主机上,与ClouderaManagerServer通信,执行管理命令。4.1.2安装Kafka打开ClouderaManagerServer:在浏览器中输入ClouderaManagerServer的URL。添加Kafka服务:在ClouderaManager的“Services”页面,点击“AddService”,选择Kafka。配置服务:在“ConfigureService”步骤中,设置Kafka的基本配置,如kafka.broker.id,log.dirs等。选择主机:在“SelectHosts”步骤中,选择将要运行KafkaBroker的主机。安装服务:完成配置后,点击“ReviewandInstall”,然后点击“Install”。4.1.3配置KafkaKafka的配置文件通常位于/etc/kafka/conf/perties。在ClouderaManager中,你可以通过“Services”->“Kafka”->“Roles”->“Config”来修改配置。关键配置包括:kafka.broker.id:每个Broker的唯一ID。log.dirs:日志文件的存储目录。zookeeper.connect:Zookeeper服务的连接信息。4.2Kafka集群的设置与管理4.2.1集群设置在ClouderaManager中,Kafka集群的设置主要涉及:Zookeeper:Kafka依赖Zookeeper进行协调和管理。确保Zookeeper服务已经运行,并在Kafka配置中正确设置。Replication:配置主题的分区副本数,以提高数据的可靠性和可用性。Partition:设置主题的分区数,以提高并行处理能力。4.2.2集群管理创建主题:在“Services”->“Kafka”->“Topics”页面,点击“AddTopic”,输入主题名称和配置。管理Broker:在“Services”->“Kafka”->“Roles”页面,可以查看和管理每个Broker的状态和配置。监控Kafka:ClouderaManager提供了丰富的监控指标,包括Broker的健康状态、主题的生产者和消费者状态等。4.3Kafka监控与性能调优4.3.1监控KafkaClouderaManager提供了Kafka的监控面板,可以监控:Broker状态:包括CPU使用率、磁盘使用率、网络I/O等。主题状态:包括生产者和消费者的延迟、吞吐量等。Zookeeper状态:Kafka依赖Zookeeper,因此Zookeeper的健康状态也非常重要。4.3.2性能调优Kafka的性能调优主要涉及:调整Broker配置:例如,增加work.threads和num.io.threads可以提高Broker的处理能力。优化主题配置:例如,设置合理的retention.ms和segment.bytes可以平衡存储和性能。监控和调整:使用ClouderaManager的监控面板,定期检查Kafka的性能指标,根据需要进行调整。4.3.3示例:创建主题#使用Kafka命令行工具创建主题

#假设Kafka的bin目录在PATH中

kafka-topics.sh--create\

--topicmy-topic\

--zookeeperlocalhost:2181\

--replication-factor3\

--partitions6在ClouderaManager中,你不需要手动执行上述命令,而是通过图形界面进行操作。但是,理解命令行工具的使用对于深入理解Kafka的配置和管理非常有帮助。4.3.4示例:调整Broker配置在ClouderaManager中,你可以调整work.threads和num.io.threads来优化Broker的性能。例如,将work.threads从默认的3调整到10,将num.io.threads从默认的8调整到16。#KafkaBroker配置示例

work.threads=10

num.io.threads=164.3.5示例:优化主题配置调整主题的retention.ms和segment.bytes可以平衡存储和性能。例如,将retention.ms设置为86400000(一天),将segment.bytes设置为1073741824(1GB)。#Kafka主题配置示例

retention.ms=86400000

segment.bytes=1073741824通过以上步骤,你可以在ClouderaManager中成功部署、设置和管理Kafka集群,并进行性能调优。5Kafka消息队列管理5.1Kafka主题管理Kafka主题是消息的分类或馈送名称。每个消息(称为记录)都属于一个特定的主题。在ClouderaManager中管理Kafka主题,涉及到主题的创建、删除、查看和配置。5.1.1创建主题#使用Kafka命令行工具创建主题

#假设Kafka的Broker列表为kafka1:9092,kafka2:9092,kafka3:9092

#主题名为my_topic,分区数为3,副本因子为2

kafka-topics.sh--create\

--topicmy_topic\

--zookeeperzookeeper1:2181,zookeeper2:2181,zookeeper3:2181\

--partitions3\

--replication-factor25.1.2删除主题#删除主题

kafka-topics.sh--delete\

--topicmy_topic\

--zookeeperzookeeper1:2181,zookeeper2:2181,zookeeper3:21815.1.3查看主题#查看所有主题

kafka-topics.sh--list\

--zookeeperzookeeper1:2181,zookeeper2:2181,zookeeper3:21815.1.4配置主题主题的配置可以通过修改broker.config文件来实现,例如,可以设置log.retention.hours来控制日志的保留时间。5.2Kafka消费者与生产者管理Kafka的生产者负责发布消息到Kafka主题,而消费者则订阅主题并处理消息。5.2.1生产者示例fromkafkaimportKafkaProducer

#创建一个生产者实例

producer=KafkaProducer(bootstrap_servers='kafka1:9092,kafka2:9092,kafka3:9092')

#发送消息到主题

producer.send('my_topic',b'some_message_bytes')

#确保所有消息都被发送

producer.flush()

#关闭生产者

producer.close()5.2.2消费者示例fromkafkaimportKafkaConsumer

#创建一个消费者实例

consumer=KafkaConsumer('my_topic',

group_id='my-group',

bootstrap_servers='kafka1:9092,kafka2:9092,kafka3:9092')

#消费消息

formessageinconsumer:

print("%s:%d:%d:key=%svalue=%s"%(message.topic,message.partition,

message.offset,message.key,

message.value))5.3Kafka消息队列的备份与恢复Kafka的备份与恢复主要依赖于其日志文件的管理。Kafka的日志文件存储在磁盘上,可以通过复制这些文件来备份数据。5.3.1备份#备份主题my_topic的分区0

#假设Kafka的Broker地址为kafka1:9092

kafka-log-dirs.sh--bootstrap-serverkafka1:9092\

--command-config/path/to/perties\

--alter-configs\

--add-configlog.retention.hours=1\

--topicmy_topic\

--partitions0然后,可以使用scp或rsync等工具将日志文件复制到备份位置。5.3.2恢复Kafka的恢复通常涉及到将备份的日志文件复制回Kafka的Broker。然后,如果需要,可以重新配置主题的日志保留时间。#恢复主题my_topic的分区0

#假设Kafka的Broker地址为kafka1:9092

kafka-log-dirs.sh--bootstrap-serverkafka1:9092\

--command-config/path/to/perties\

--alter-configs\

--add-configlog.retention.hours=168\

--topicmy_topic\

--partitions0在实际操作中,备份和恢复可能需要更复杂的步骤,包括处理日志文件的同步、确保数据的一致性以及可能的重新分区等。以上示例和说明提供了在ClouderaManager环境中管理Kafka消息队列的基本操作,包括主题管理、生产者与消费者管理以及备份与恢复策略。通过这些操作,可以有效地管理和维护Kafka集群,确保数据的可靠性和服务的连续性。6Kafka监控与故障排除6.1Kafka监控指标解析Kafka的监控指标对于理解系统健康状况和性能至关重要。这些指标可以分为几大类,包括但不限于:BrokerMetrics:这些指标关注于Broker的性能,如消息的发送和接收速率、请求处理时间、磁盘使用情况等。TopicMetrics:提供关于特定主题的详细信息,如消息的生产速率、消费速率、滞后情况等。ConsumerMetrics:监控消费者组的健康,包括消费者组的滞后、消费者组的成员变化等。ProducerMetrics:跟踪生产者的行为,如生产消息的速率、重试次数、失败次数等。6.1.1示例:BrokerMetrics#KafkaBrokerMetrics示例

fromkafkaimportKafkaAdminClient

admin_client=KafkaAdminClient(bootstrap_servers='localhost:9092')

metrics=admin_client.describe_cluster()['controller']['metrics']

#打印Broker的请求处理时间

formetricinmetrics:

ifmetric['name']=='RequestHandlerAvgIdlePercent':

print(f"Broker请求处理平均空闲百分比:{metric['value']}")6.2使用ClouderaManager监控KafkaClouderaManager提供了一个集成的平台,用于管理、监控和控制Hadoop集群,包括Kafka。通过ClouderaManager,可以轻松地查看Kafka的实时监控数据,设置警报,以及进行故障排除。6.2.1设置警报在ClouderaManager中,可以为Kafka的特定指标设置警报,例如,当消息的生产速率低于某个阈值时,系统会自动发送通知。6.2.2查看监控数据ClouderaManager的仪表板提供了Kafka集群的概览,包括Broker、Topic和消费者组的状态。6.2.3故障排除当Kafka集群出现性能问题或故障时,ClouderaManager的监控数据和日志可以帮助定位问题。6.3Kafka常见故障与解决策略6.3.1故障1:消息丢失原因:可能是因为Broker的磁盘空间不足,导致日志被清理。解决策略:-增加Broker的磁盘空间。-调整log.retention.hours和log.retention.bytes配置,以控制日志的保留时间。6.3.2故障2:消费者滞后原因:消费者处理消息的速度慢于生产者发送消息的速度。解决策略:-增加消费者组中的消费者数量。-优化消费者处理逻辑,提高处理效率。6.3.3故障3:网络延迟原因:网络问题导致消息传输延迟。解决策略:-检查网络配置,确保网络连接稳定。-调整request.timeout.ms和socket.timeout.ms配置,以适应网络环境。6.3.4故障4:Broker不可用原因:Broker可能因为硬件故障、软件错误或配置问题而变得不可用。解决策略:-确保集群中有足够的Broker副本,以提高容错性。-定期检查Broker的硬件和软件状态。-使用ClouderaManager的监控功能,及时发现并解决Broker的问题。通过以上监控和故障排除策略,可以有效地管理和维护Kafka集群,确保其稳定运行和高效性能。7Kafka在实际场景中的应用7.1Kafka在实时数据流处理中的应用Kafka作为一款分布式消息系统,其在实时数据流处理中的应用尤为突出。它能够处理大量数据流,提供高吞吐量、低延迟的数据传输,是构建实时数据管道和流处理应用的理想选择。7.1.1原理Kafka通过将数据组织成主题(Topic)的形式,每个主题可以有多个分区(Partition),每个分区可以有多个副本(Replica),这样可以实现数据的高可用性和高并发处理。生产者(Producer)将数据发送到特定的主题,而消费者(Consumer)则订阅这些主题来消费数据。Kafka还支持数据的持久化存储,即使在系统重启后,数据也不会丢失。7.1.2示例假设我们有一个实时日志处理系统,需要从多个服务器收集日志数据,并实时分析这些数据以检测异常行为。我们可以使用Kafka来构建这个系统。代码示例:Kafka生产者fromkafkaimportKafkaProducer

importjson

#创建Kafka生产者

producer=KafkaProducer(bootstrap_servers='localhost:9092',

value_serializer=lambdav:json.dumps(v).encode('utf-8'))

#发送数据到Kafka主题

data={'server':'server1','log':'Error:connectiontimeout'}

producer.send('logs',value=data)

#确保所有数据被发送

producer.flush()

#关闭生产者

producer.close()代码示例:Kafka消费者fromkafkaimportKafkaConsumer

importjson

#创建Kafka消费者

consumer=KafkaConsumer('logs',

bootstrap_servers='localhost:9092',

value_deserializer=lambdam:json.loads(m.decode('utf-8')))

#消费数据

formessageinconsumer:

print(f"Receivedlogfromserver:{message.value['server']},log:{message.value['log']}")7.1.3描述在上述示例中,我们首先创建了一个Kafka生产者,用于将日志数据发送到名为logs的主题。数据被序列化为JSON格式,以便于传输和解析。接着,我们创建了一个Kafka消费者,订阅logs主题,消费并打印接收到的日志数据。通过这种方式,我们可以实时地收集和处理来自多个服务器的日志,实现异常检测和实时监控。7.2Kafka与数据湖的集成数据湖是一种存储大量原始数据的架构,而Kafka可以作为数据湖的实时数据源,将实时数据流直接写入数据湖,便于后续的数据分析和处理。7.2.1原理Kafka可以与数据湖集成,通过KafkaConnect框架,将Kafka中的数据流自动地写入数据湖中。KafkaConnect支持多种数据湖存储格式,如Parquet、Avro等,可以确保数据的高效存储和查询。7.2.2示例假设我们有一个数据湖,需要将实时的用户行为数据写入数据湖,以便于后续的分析和挖掘。代码示例:KafkaConnect配置name:user-behavior-to-data-lake

config:

connector.class:org.apache.kafka.connect.file.FileStreamSinkConnector

tasks.max:1

file:/path/to/data-lake/user-behavior.parquet

topic:user-behavior

format.class:org.apache.kafka.connect.storage.parquet.ParquetFormat

patibility:BACKWARD7.2.3描述在上述示例中,我们配置

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论