Java:多线程应用——多设备在线升级

技术点

  • 多线程
  • 数据结构
  • 状态管理

多线程

多线程最需要关注的就是数据,常规操作就是共享数据+锁,这种方式比较繁琐,也容易出错。现代多线程技术常用的是事件通知+值对象(不可变)的无锁模式,这种方式只要前期把任务分配好,数据结构设计好,一般是不会出问题的。Java自带的Future只提供了获取内容的方法,事件通知还需要自己去实现,Netty在此基础上做了个比较好用的事件监听。原理也很简单,每次提交任务时新建并注册一个Promise,等线程执行完毕设置结果时,查找注册表,调用void operationComplete(F future) throws Exception;方法通知监听者。Netty:执行命令带 Future返回)这样可以避免线程竞争,无需锁,没有线程上下文切换。

数据结构

操作数据的存取,对应命令数据的匹配。

状态管理

当前任务的状态,它是线程独有的,不可共享。

代码

下载入口方法

/**
    * 开始下载
    *
    * @param selectDevices 选择的设备
    */
   public void start(List<Map<String, String>> selectDevices) {
       selectDevices.forEach(sd -> {
           IReaderDrive drive = new R2000UpgradeDrive(sd.get("key"));

           BlockingDeque<byte[]> data = new LinkedBlockingDeque<>(ReadFile.DATA_PKT_QUEUE);
           try {
               switch (Mode.valueOf(String.valueOf(sd.get("mode")))) {
                   case NORMAL: // 常规模式:要先进入BOOTLOADER模式,再进入下载模式
                       drive.submit(new EnterBootloaderMode()).addListener(future -> {
                           TimeUnit.MILLISECONDS.sleep(500); // 不能发太快,设备反应不过来!
                           enterDownloadMode(drive, data);
                       });
                       break;
                   case BOOTLOADER: // BOOTLOADER模式:直接进入下载模式
                       enterDownloadMode(drive, data);
                       break;
               }
           } catch (DriveException e) {
               e.printStackTrace();
           }
       });
   }

参数是前端选择的升级设备列表,设备key(地址)和升级模式。

这里连接了设备IReaderDrive drive = new R2000UpgradeDrive(sd.get("key"));
准备了下载数据BlockingDeque<byte[]> data = new LinkedBlockingDeque<>(ReadFile.DATA_PKT_QUEUE);

这条线程至始至终都围绕着这个设备和这堆数据转,直至下载完毕或异常退出。
在这里插入图片描述

  • 模式一,先进入Bootloader模式,再进入下载模式
  • 模式二,直接进入下载模式

两者最终都进入下载模式 enterDownloadMode

/**
     * 进入下载模式
     *
     * @param data 数据
     * @throws DriveException 驱动异常
     */
    private void enterDownloadMode(IReaderDrive drive, BlockingDeque<byte[]> data) throws DriveException {
        drive.submit(new EnterDownloadMode()).addListener(future -> {
            // 第二次数据返回
            EnterDownloadModeEvent event = (EnterDownloadModeEvent) future.get();
            EnterDownloadModeResponse2 response = (EnterDownloadModeResponse2) event.getData();
            if (response.status[0] == 0 && response.status2[0] == 14) {
                // 开始下载(第一轮数据)
                Status status = new Status(drive.getDeviceKey());
                status.pktSize = data.size();
                status.startTimeMill = System.currentTimeMillis();
                this.downloadData(drive, status, data);
            }
        });
    }

这里进入下载模式返回两包数据,只监听了EnterDownloadModeResponse2 ,当第二包数据返回时,if (response.status[0] == 0 && response.status2[0] == 14)条件成立就开始下载升级包里的数据。添加状态,设置初始值。传递到下载方法this.downloadData(drive, status, data);


/**
 * 下载数据
 *
 * @param status 状态
 * @param data   数据
 * @throws DriveException 驱动异常
 */
private void downloadData(IReaderDrive drive, Status status, BlockingDeque<byte[]> data) throws DriveException {

    if (status.isMaxiDataLen) {
        log.warn("the address is wrong, no data is written!");
        return;
    }

    if (data.isEmpty()) {
        // 下发最后一包结束标识
        drive.submit(new DownloadData(FrameConstant.NONE_DATA)).addListener(future -> downloadDataResponse(drive, status, future, data));
        return;
    }

    // 文件读取的数据
    byte[] takeData = data.poll();

    // 下发的数据
    byte[] downloadData = new byte[FrameConstant.COMMON_DATA_LENGTH + takeData.length];

    // 添加 hdr,pkt_len数据长度
    FrameConstant.HDR[4] = (byte) (3 + takeData.length / 4);
    System.arraycopy(FrameConstant.HDR, 0, downloadData, 0, FrameConstant.HDR.length);

    // 最后一包数据长度是20
    if (status.addr == FrameConstant.LAST_PKT_DATA_LENGTH && (takeData.length > 20)) {
        status.isMaxiDataLen = true;
        log.error("error", new Exception("the file data length is wrong!"));
        DEVICE_INFO_CACHE.get(status.getKey()).setProgress("data length error!");
        return;
    }

    // 添加 abs_addr,转为小端模式
    byte[] add1Bytes = HexadecimalUtil.toByteArray(HexadecimalUtil.get16NumAdd0(status.addr, 8));
    ByteBuffer buffer = ByteBuffer.wrap(add1Bytes);
    buffer.order(ByteOrder.LITTLE_ENDIAN);

    byte[] absAddr = HexadecimalUtil.toByteArray(HexadecimalUtil.get16NumAdd0(buffer.getInt(), 8));
    System.arraycopy(absAddr, 0, downloadData, FrameConstant.HDR.length, absAddr.length);

    // 计算下一包的地址
    status.addr += takeData.length;

    // 添加 crc
    int destPos = FrameConstant.HDR.length + absAddr.length;
    System.arraycopy(FrameConstant.CRC_REPLACE, 0, downloadData, destPos, FrameConstant.CRC_REPLACE.length);

    // 添加 flags
    int destPos1 = destPos + FrameConstant.FLAGS.length;
    System.arraycopy(FrameConstant.FLAGS, 0, downloadData, destPos1, FrameConstant.FLAGS.length);

    // 添加 data
    int destPos2 = destPos1 + FrameConstant.FLAGS.length;
    System.arraycopy(takeData, 0, downloadData, destPos2, takeData.length);

    // 执行下发数据命令
    drive.submit(new DownloadData(Validate.replaceCrc(downloadData))).addListener(future -> downloadDataResponse(drive, status, future, data));

}

这里的参数都是从上往下穿递过来的,这里以后最好还是用上下文会比较好,省的传递一堆的数据。
第一个if (status.isMaxiDataLen)如果数据达到了设备的最大内存范围,禁止写入。第二个 if (status.addr == FrameConstant.LAST_PKT_DATA_LENGTH && (takeData.length > 20))判断最后一包数据是否合规,其他都是拼接数据帧操作。

下载数据返回监听.addListener(future -> downloadDataResponse(drive, status, future, data));

下载数据返回

/**
     * 下载数据返回
     * <p>
     * 太多数据传输,以后应该设计上下文模式
     *
     * @param status 状态
     * @param future 下载future
     * @param data   数据
     * @throws Exception DriveException InterruptedException ExecutionException
     */
    private void downloadDataResponse(IReaderDrive drive, Status status, Future<? super Object> future, BlockingDeque<byte[]> data) throws Exception {
        DownloadDataEvent event = (DownloadDataEvent) future.get();
        DownloadDataResponse eventData = event.getData();

        if (data == null) {
            return;
        }

        String key = event.getDevice().getKey();

        ByteBuffer buffer = ByteBuffer.wrap(eventData.status);
        buffer.order(ByteOrder.LITTLE_ENDIAN);

        int anInt = buffer.getInt();
        NvmemPktAck ack = NvmemPktAck.resolve(anInt);
        if (ack == null) {
            log.warn("undefined return status! ===> " + HexadecimalUtil.get16NumAdd0(anInt, 4).toUpperCase());
            return;
        }
        try {
            switch (ack) {
                case NVMEMUPD_STAT_UPD_SUCCESS:
                    // break;
                case NVMEMUPD_STAT_EXIT_SUCCESS:
                    // break;
                case NVMEMUPD_STAT_ENTRY_OK:
                    // log.info(ack.toString());
                    break;

                case NVMEMUPD_STAT_RXPKT_MAX:
                    // break;
                case NVMEMUPD_STAT_RX_TO:
                    // break;
                case NVMEMUPD_STAT_CRC_ERR:
                    // break;
                case NVMEMUPD_STAT_EXIT_NOWRITES:
                    // break;
                case NVMEMUPD_STAT_GEN_RXPKT_ERR:
                    // break;
                case NVMEMUPD_STAT_RESERVED_02:
                    // break;
                case NVMEMUPD_STAT_INT_MEM_BNDS:
                    // break;
                case NVMEMUPD_STAT_RESERVED_01:
                    // break;
                case NVMEMUPD_STAT_WR_FAIL:
                    // break;
                case NVMEMUPD_STAT_UNK_CMD:
                    // break;
                case NVMEMUPD_STAT_CMD_IGN:
                case NVMEMUPD_STAT_PKTLEN:
                case NVMEMUPD_STAT_EXIT_ERR:
                    log.warn(ack.toString());
                    break;
                case NVMEMUPD_STAT_MAGIC:
                    // 已经进入bootloader模式,再次发送命令2时返回的状态!
                    DEVICE_INFO_CACHE.get(key).setProgress("is MODE 2!");
                    log.warn(ack.toString());
                    return;

                default:
                    break;

            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 1下载中、2下载完成
        if (eventData.reCmd[0] == 2) {
            log.info(key + ": the upgrade is complete, time-consuming:" + (System.currentTimeMillis() - status.startTimeMill) + " ms");
            // 设置当前下载成功
            status.downloadComplete = true;
            DEVICE_INFO_CACHE.get(key).setProgress("100%");
            return;
        }

        // 前端显示进度条
        float progress = 100 - ((float) data.size() / (float) status.pktSize * 100);
        DEVICE_INFO_CACHE.get(key).setProgress(String.format("%.2f", progress) + "%");

        // 下一轮,继续
        downloadData(drive, status, data);
        status.downloadComplete = false;
    }

获取在线设备信息


/**
 * 前端获取设备信息
 */
public void refresh() {
    IReaderDrive.DEVICES.keySet().forEach(key -> {

        List<R2000BaseQuery> queries = new ArrayList<>();
        queries.add(new DeviceSNQuery(0));
        queries.add(new DeviceSoftwareInfoQuery(0));
        queries.add(new DeviceHardwareInfoQuery(0));
        queries.add(new ReadDeviceId());

        Map<String, Object> infos = new HashMap<>();
        infos.put("DEVICE_SN_INFO", "-");
        infos.put("DEVICE_SOFTWARE_INFO", "-");
        infos.put("DEVICE_HARDWARE_INFO", "-");

        this.getDeviceInfo(key, queries, infos);
    });
}

/**
 * 查询单个设备的信息
 *
 * @param deviceKey 设备ID
 * @param queries   查询项(SN最后查询,应放到第一项,设备ID第一个查询,应放到最后)
 * @param infos     设备信息
 */
private void getDeviceInfo(String deviceKey, List<R2000BaseQuery> queries, Map<String, Object> infos) {
    R2000BaseQuery query = queries.remove(queries.size() - 1);
    query.setDeviceKey(deviceKey);

    if (infos.containsKey("DEVICE_ADDR")) {
        query.setDeviceId((int) infos.get("DEVICE_ADDR"));
        infos.put("DEVICE_KEY", deviceKey);
    }

    BusHandler.submit(query).addListener(future -> {
        ReadDataEvent event = (ReadDataEvent) future.get();

        Optional<Object> deviceInfo = event.getData2();
        SubFunCode subFunCode = event.getSubFunCode();
        infos.put(subFunCode.name(), deviceInfo.get());

        if (subFunCode == SubFunCode.DEVICE_SN_INFO) {
            DEVICE_INFO_CACHE.put(deviceKey, new UpgradeDevice(infos));
            sendMsg(WSMessage.builder().data(DEVICE_INFO_CACHE.values()).method(Method.UPGRADE_REFRESH).build());
        } else {
            this.getDeviceInfo(deviceKey, queries, infos);
        }
    });
}

在线升级设备信息列表

public static final Map<String, UpgradeDevice> DEVICE_INFO_CACHE = new HashMap<>();

升级的设备

package com.xxx.sdk.valobj;

import com.xxx.sdk.protocol.r2000.upgrade.Mode;

import java.util.Map;
import java.util.Objects;

/**
 * 升级的设备
 *
 * @author Rubin
 * @version v1 2020/12/12 9:31
 */
public class UpgradeDevice {

    private final int id;
    private Object address;
    private Object software;
    private Object hardware;
    private Object sn;
    private Object progress;
    private Object mode;

    public UpgradeDevice(Map<String, Object> infos) {
        this.id = (int) infos.get("DEVICE_ADDR");
        this.address = infos.get("DEVICE_KEY");
        this.software = infos.get("DEVICE_SOFTWARE_INFO");
        this.hardware = infos.get("DEVICE_HARDWARE_INFO");
        this.sn = infos.get("DEVICE_SN_INFO");
        this.progress = "-";
        this.mode = Mode.NORMAL;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        UpgradeDevice that = (UpgradeDevice) o;
        return Objects.equals(software, that.software) &&
                Objects.equals(id, that.id) &&
                Objects.equals(hardware, that.hardware) &&
                Objects.equals(sn, that.sn);
    }

    @Override
    public int hashCode() {
        return Objects.hash(software, id, hardware, sn);
    }

	// get set
}

升级状态

package com.xxx	.sdk.upgrade;

import com.xxx.sdk.protocol.r2000.upgrade.frame.FrameConstant;
import com.xxx.sdk.Method;
import com.xxx.sdk.server.WSMessage;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static com.xxx.sdk.server.WsHandler.sendMsg;
import static com.xxx.sdk.upgrade.Upgrade.DEVICE_INFO_CACHE;

/**
 * 下载状态
 *
 * @author Rubin
 * @version v1 2020/12/12 17:42
 */
@Slf4j
public class Status {

    /**
     * 设备key
     */
    private String key;

    /**
     * 下载配置开关
     */
    private final boolean upgradeConfig = false;

    /**
     * 当前下载完毕
     */
    public boolean downloadComplete = false;

    /**
     * 是否到达最大数据范围
     */
    public boolean isMaxiDataLen = false;

    /**
     * 起始地址
     */
    public long addr = upgradeConfig ? FrameConstant.ADD2 : FrameConstant.ADD1;

    /**
     * 包大小
     */
    public int pktSize = 0;

    /**
     * 下载开始时间
     */
    public long startTimeMill = 0;

    public Status() {
        this.sendStatus();
    }

    public Status(String key) {
        this.key = key;
        this.sendStatus();
    }

    public String getKey() {
        return key;
    }
    
	/**
     * 发送升级状态(进度)
     */
    public void sendStatus() {
        Executors.newSingleThreadExecutor().execute(() -> {
            log.debug(key + " ==> start send status...");
            Map<String, Object> map = new HashMap<>(2);
            while (!downloadComplete) {
                map.put("key", key);
                map.put("value", DEVICE_INFO_CACHE.get(key).getProgress());
                sendMsg(WSMessage.builder().data(map).method(Method.UPGRADE_ONLINE_MONITORING).build());
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        Status status = (Status) o;
        return Objects.equals(key, status.key);
    }

    @Override
    public int hashCode() {
        return Objects.hash(key);
    }

}

效果

各设备间独自下载,互不影响。