网站制作 建站,汽车网站开发思路,电影网站免费建设,做游戏交易网站有哪些内容在上一篇文章中 #xff0c;我们了解了mesos是什么#xff0c;它如何有用#xff0c;并开始使用它。 在本文中#xff0c;我们将看到如何在mesos上编写自己的框架。 #xff08;在mesos中#xff0c;框架是在其上运行的任何应用程序。#xff09;本文介绍了一个名为“ m… 在上一篇文章中 我们了解了mesos是什么它如何有用并开始使用它。 在本文中我们将看到如何在mesos上编写自己的框架。 在mesos中框架是在其上运行的任何应用程序。本文介绍了一个名为“ mesos-pinspider”的框架该框架获取用户的pinterest页面的用户配置文件信息和用户面板信息。 Mesos框架 通常Mesos框架具有三个基本组件。 将任务提交给框架的驱动程序 向主服务器注册要提供资源的调度程序 执行任务并在执行程序上运行它们 在从属节点上启动以执行框架任务的执行程序进程 Pinspider框架示例 您可以在github上检查代码。 让我们将其细分为PinDriverPinScheduler和Pin UserProfileExecutor。 司机 该框架的驱动程序组件是PinDriver。 创建执行人信息 使用Builder模式描述有关执行程序的信息而mesos使用Google协议缓冲区进行数据交换。 在这里我们需要设置executorID该命令基本上是一个shell命令通过以下命令执行/ bin / sh -c value。 在执行命令之前将获取指定的所有URI。 名称由setName设置。 来源由 setSource框架用来跟踪执行程序的源的标识符样式字符串。 当不同的执行者ID可能在语义上相关时这很有用。 Protos.ExecutorInfo userProfileExecutorInfo Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue(PinUserProfileExecutor)).setCommand(commandInfoUserProfile).setName(PinUserProfileExecutor Java).setSource(java).build(); 创建框架信息 描述框架信息。 用户字段用于确定执行程序/任务应以其启动的Unix用户。 如果用户字段设置为空字符串Mesos将自动将其设置为当前用户。 主机在删除框架之前等待调度程序进行故障转移的时间由以下方式指定 setFailoverTimeout。 框架的名称由setName设置 Protos.FrameworkInfo.Builder frameworkBuilder Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser().setName(Pinspider Framework); 实例化调度程序 您需要使用要提交的执行程序实例化任务数量来实例化调度程序。 Scheduler scheduler args.length 1 ?new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) :new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]); 注意请注意使用了两个ExecutorInfo。 一个用于获取用户配置文件信息另一个用于获取用户板信息以进行演示。 此说明仅涉及一个executorinfo – userProfileExecutorInfo 启动mesos调度程序驱动程序。 MesosSchedulerDriver是SchedulerDriver的实现SchedulerDriver是将调度程序连接到mesos的抽象接口。 这是通过管理调度程序的生命周期启动停止和等待任务完成以及与Mesos交互启动任务终止任务等来完成的。 MesosSchedulerDriver schedulerDriver new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]);int status schedulerDriver.run() Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop();System.exit(status); 执行器执行 框架的执行器组件是PinUserProfileExecutor。 执行程序是由框架的执行程序实现的回调接口。 在我们的实现中让我们专注于launchTask Override public void launchTask(final ExecutorDriver executorDriver
final Protos.TaskInfo taskInfo) { } 通过使用构建器模式设置ID和状态来设置任务状态。 Protos.TaskStatus taskStatus Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId())
.setState(Protos.TaskState.TASK_RUNNING).build(); 将状态更新发送到框架调度程序根据需要重试直到收到确认或执行程序终止为止在这种情况下将发送TASK_LOST状态更新。 executorDriver.sendStatusUpdate(taskStatus); 从任务中获取数据并运行逻辑。 try {message (userprofile : getUserProfileInfo(url)).getBytes();
} catch (IOException e) {LOGGER.error(Error parsing the Pinterest URL : e.getMessage());
} 向框架发送消息。 executorDriver.sendFrameworkMessage(message); 将任务的状态标记为已完成然后将状态更新发送到框架调度程序。 taskStatus Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_FINISHED).build();
executorDriver.sendStatusUpdate(taskStatus); main方法创建MesosExecutorDriver实例并运行 mesosExecutorDriver.run() Protos.Status.DRIVER_STOPPED ? 0 : 1 调度程序实施 该框架的Scheduler组件是Pin Scheduler。 调度程序是由框架的调度程序实现的回调接口。 在我们的实现中让我们专注于resourceOffersstatusUpdate和frameworkMessage 构造函数使用执行程序信息和启动任务数进行构造。 public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) {this(pinUserProfileExecutor,pinUserBoardExecutor, 5, http://www.pinterest.com/techcrunch);
} public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor, int totalTasks, String url) { this.pinUserProfileExecutor pinUserProfileExecutor;this.pinUserBoardExecutor pinUserBoardExecutor;this.totalTasks totalTasks; this.crawlQueue Collections.synchronizedList(new ArrayListString());this.crawlQueue.add(url);
} 资源报价 资源商品可以是CPU内存等资源。从商品列表中获取资源的标量值。 在设置任务信息时我们需要提供任务资源的需求。 for (Protos.Offer offer : list) {ListProtos.TaskInfo taskInfoList new ArrayListProtos.TaskInfo();double offerCpus 0; double offerMem 0;for (Protos.Resource resource : offer.getResourcesList()) {if (resource.getName().equals(cpus)) {offerCpus resource.getScalar().getValue();}else if (resource.getName().equals(mem)) {offerMem resource.getScalar().getValue();}}LOGGER.info(Received Offer : offer.getId().getValue() with cpus offerCpus and mem offerMem); 创建任务ID。 Protos.TaskID taskID Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks)).build(); 通过设置任务ID添加资源设置数据和设置执行程序来创建任务信息。 Protos.TaskInfo pinUserProfileTaskInfo Protos.TaskInfo.newBuilder().setName(task taskID.getValue()).setTaskId(taskID).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName(cpus).setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(CPUS_PER_TASK))).addResources(Protos.Resource.newBuilder().setName(mem).setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(MEM_PER_TASK))).setData(ByteString.copyFromUtf8(crawlQueue.get(0))).setExecutor(Protos.ExecutorInfo.newBuilder(pinUserProfileExecutor)).build(); 通过SchedulerDriver启动任务。 ...
taskInfoList.add(pinUserProfileTaskInfo);
taskInfoList.add(pinUserBoardTaskInfo);
}
schedulerDriver.launchTasks(offer.getId(), taskInfoList); 状态更新 当任务的状态已更改即从属丢失且任务丢失任务完成且执行者发送状态更新时调用此方法。 Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
...
} 如果任务完成请停止SchedulerDriver if (taskStatus.getState() Protos.TaskState.TASK_FINISHED) {finishedTasks;LOGGER.info(Finished tasks : finishedTasks);if (finishedTasks totalTasks) {schedulerDriver.stop();}} 如果任务被杀死丢失或失败则中止SchedulerDriver if (taskStatus.getState() Protos.TaskState.TASK_FAILED
|| taskStatus.getState() Protos.TaskState.TASK_KILLED
|| taskStatus.getState() Protos.TaskState.TASK_LOST) {LOGGER.error(Aborting because the task taskStatus.getTaskId().getValue() is in unexpected state : taskStatus.getState().getValueDescriptor().getName() with reason : taskStatus.getReason().getValueDescriptor().getName() from source : taskStatus.getSource().getValueDescriptor().getName() with message : taskStatus.getMessage());schedulerDriver.abort();
} 框架讯息 当执行程序发送消息时将调用此函数。 处理您的讯息 Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
Protos.SlaveID slaveID, byte[] bytes) {String data new String(bytes);System.out.println(data);LOGGER.info(User Profile Information : data);
} 此处提供完整的代码以及运行和输出示例的说明。 翻译自: https://www.javacodegeeks.com/2015/01/apache-mesos-writing-your-own-distributed-frameworks.html