summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_unit.c
blob: 9696e9cd8e6aa7484c987d17fe7206462178755d (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
3599
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3702
3703
3704
3705
3706
3707
3708
3709
3710
3711
3712
3713
3714
3715
3716
3717
3718
3719
3720
3721
3722
3723
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774
3775
3776
3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
3913
3914
3915
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928
3929
3930
3931
3932
3933
3934
3935
3936
3937
3938
3939
3940
3941
3942
3943
3944
3945
3946
3947
3948
3949
3950
3951
3952
3953
3954
3955
3956
3957
3958
3959
3960
3961
3962
3963
3964
3965
3966
3967
3968
3969
3970
3971
3972
3973
3974
3975
3976
3977
3978
3979
3980
3981
3982
3983
3984
3985
3986
3987
3988
3989
3990
3991
3992
3993
3994
3995
3996
3997
3998
3999
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
4010
4011
4012
4013
4014
4015
4016
4017
4018
4019
4020
4021
4022
4023
4024
4025
4026
4027
4028
4029
4030
4031
4032
4033
4034
4035
4036
4037
4038
4039
4040
4041
4042
4043
4044
4045
4046
4047
4048
4049
4050
4051
4052
4053
4054
4055
4056
4057
4058
4059












                                


                               




                           









                                                                                

                                                               
                                                  
                                               




                                                                         



                                                                           





                                                            



                                                                          



                                                                               





                                                                        


                                                                              





















                                                                               
                                   







                                                                            























                                                                             




                                                                





                                                                                


                                    
                                  
                                    






                                       





                                                   



                                    
                                
                                      

                                           







                                     
                         





                                     





                                           

                                   
                                       






                                          












                                            


                                      

                                        




                                               
                                            



                                           


                                             


                                             


                                           


















































































































































































                                                                            



                                                                










































                                                        
          


                                                                      

            




                                                           






                                                                    
                                        
                                       

                                          



                                                                         





                                                                    
                                

                       











































                                                         






































































































                                                                             






                                    



                                                             
                     








                                                
                                                         

     
                                 
 




                                                                           





                                               






















                                                                            
                                                                















                                                                  

                                                       
 



                                                             
 



                                                                     



                      

                                                                     
 


                                                          
 


                                                        
 




                                                                              



                      
                                                  
 

                                                         
 
                                 
 
                         

              


                                                                  
 

                  
 
     
 


                            
 


                                                      
 


                                                        
 

              
 
 






                                                                             
 



                                                                           
 

                              
 


                                                                     
 

                              
 
                                   
 


                                                             
 
           
 


                                                                                
 

                              
 

                                                          
 


                                   
 
                      
 
                                                             
 

                                                   
 
 







                                                                                
 


                                                                      
 

                              
 



                                                                            
 

                              
 



                                                                        
 

                              
 
                         
 

                                                             
 
                                   
 
                               
 














                                                                    

     
                             
 





                                                              

     







                                                           
 






























                                                                                

     

































                                                                               


                                                                         



































                                                                    

     













                                                       














                                                               

                                         
                                                  

                                               


                                                              





                                       

                                             








                                                                             

                                           








                                                                              
                                            







                                                                        

                                                                             
 

                                
 


                                                       
 


                                                       
 









                                                                        

                                         


                                                                
 

                                           
                                           







                                                                  


















                                                                             

                                         
                                                 

                                               

                                                                  






                                                  

                                               



























                                                                               

                                                  
                                                                       

                                                    

 
 



                                                                       
 
                  
























                                                                 













                                                               














































































































































                                                                             

                                                           

                                                     
                                                                     













                                                                 
                                        




                                            
                                                                  

                                                       



                                                                        




                                                                           
                    


                                                                             
                    













                                                                 



                                                                           



































































                                                                                 
                                                    















                                                                      
                        


                                                           
                        













































































                                                                            



                                                                            






                                                                             
                                                                         


























                                                                        













                                                                        

                                               

                                                                              




                        
                                                                     
 

                                                                        






















                                                                            
                                                                     




                                                              
                                                             








                                          




                                                               

                                         
                                     

                                               

                                                       
                        


            
                                      
 
                                           

                                               










                                                        

                                       

                                                   
                                                                      

                                                     













































































                                                                           
 


                                                                                































                                                                         
                                                                             























                                                                        
                                                                         





























                                                             
                        







                                             
                                                    



                       



                                                                                 







                                                               
                                                                          




                                            
                                                                      

















                                                                              

                                                                            
 
 






                                                                       







                                        
                                   

                                                               
 
                                 


                    
                                























                                                                        

























                                                                        


                                                       


                                                                      






                                                              
                                                                              

































































                                                                            


                                                            



































                                                                             







                                                                            






                                     
             




















                                             
             


                       
                 



















































                                                                           
                                  




















                                                                    



























































































































































                                                                                







































































                                                                            
                          

                
                                


















































































































































































                                                                                




                                                                             
                                   



























































































































































































































                                                                               
                                                              





















                                                                          

                                                           




                                  
                                                                    


                                                             
                                         










                                                                
                                                                      



                                     
                                                




                                                                           
                                                              











                                                          

                                            








                                                                          

                                                               







                                                                   







                                                           

                                                                               





                                                    

                                                   





                                         
                                                                               
                                        


                                                               
                                             





















































































































































































                                                                               
   


























                                                                           




                                 























                                                                         

                                        

                                                  

                                              





                                              

                                       








































                                                                           







                                                      









                                        




                                               












                                                                        

                                                    
 


                                           
                       
     







                                                      






                                                         

                                            


























































































































































                                                                               
                                   








































































































































































































































































































































































































































































































                                                                              












































































                                                                           







































































                                                                               
                                                                    




















































































                                                                           

/*
 * Copyright (C) NGINX, Inc.
 */

#include <stdlib.h>

#include "nxt_main.h"
#include "nxt_port_memory_int.h"

#include "nxt_unit.h"
#include "nxt_unit_request.h"
#include "nxt_unit_response.h"
#include "nxt_unit_websocket.h"

#include "nxt_websocket.h"

#if (NXT_HAVE_MEMFD_CREATE)
#include <linux/memfd.h>
#endif

typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
typedef struct nxt_unit_process_s               nxt_unit_process_t;
typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;

static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
    nxt_unit_ctx_impl_t *ctx_impl, void *data);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
    nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
    nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
    nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    uint32_t stream);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
    nxt_unit_ctx_t *ctx);
static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
    nxt_unit_ctx_t *ctx);
static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
    nxt_unit_mmap_buf_t *mmap_buf, int last);
static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
    size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
    nxt_chunk_id_t *c, int n);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
    nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);

static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, int i);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, uint32_t id);
static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
    uint32_t size);

static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
    pid_t pid);
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
    pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
    nxt_unit_port_id_t *port_id, int *fd);

static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
    nxt_unit_port_id_t *new_port, int fd);

static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
    nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
    nxt_unit_process_t **process);
static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process);

static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
    nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
    const void *oob, size_t oob_size);
static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
    nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
    void *oob, size_t oob_size);

static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
    nxt_unit_port_t *port);
static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
    nxt_unit_port_id_t *port_id, int remove);

static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
    nxt_unit_request_info_impl_t *req_impl);
static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
    nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);

static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);


struct nxt_unit_mmap_buf_s {
    nxt_unit_buf_t           buf;

    nxt_unit_mmap_buf_t      *next;
    nxt_unit_mmap_buf_t      **prev;

    nxt_port_mmap_header_t   *hdr;
//    nxt_queue_link_t         link;
    nxt_unit_port_id_t       port_id;
    nxt_unit_request_info_t  *req;
    nxt_unit_ctx_impl_t      *ctx_impl;
};


struct nxt_unit_recv_msg_s {
    uint32_t                 stream;
    nxt_pid_t                pid;
    nxt_port_id_t            reply_port;

    uint8_t                  last;      /* 1 bit */
    uint8_t                  mmap;      /* 1 bit */

    void                     *start;
    uint32_t                 size;

    int                      fd;
    nxt_unit_process_t       *process;

    nxt_unit_mmap_buf_t      *incoming_buf;
};


typedef enum {
    NXT_UNIT_RS_START           = 0,
    NXT_UNIT_RS_RESPONSE_INIT,
    NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
    NXT_UNIT_RS_RESPONSE_SENT,
    NXT_UNIT_RS_RELEASED,
} nxt_unit_req_state_t;


struct nxt_unit_request_info_impl_s {
    nxt_unit_request_info_t  req;

    uint32_t                 stream;

    nxt_unit_process_t       *process;

    nxt_unit_mmap_buf_t      *outgoing_buf;
    nxt_unit_mmap_buf_t      *incoming_buf;

    nxt_unit_req_state_t     state;
    uint8_t                  websocket;

    nxt_queue_link_t         link;

    char                     extra_data[];
};


struct nxt_unit_websocket_frame_impl_s {
    nxt_unit_websocket_frame_t  ws;

    nxt_unit_mmap_buf_t         *buf;

    nxt_queue_link_t            link;

    nxt_unit_ctx_impl_t         *ctx_impl;

    void                        *retain_buf;
};


struct nxt_unit_ctx_impl_s {
    nxt_unit_ctx_t                ctx;

    pthread_mutex_t               mutex;

    nxt_unit_port_id_t            read_port_id;
    int                           read_port_fd;

    nxt_queue_link_t              link;

    nxt_unit_mmap_buf_t           *free_buf;

    /*  of nxt_unit_request_info_impl_t */
    nxt_queue_t                   free_req;

    /*  of nxt_unit_websocket_frame_impl_t */
    nxt_queue_t                   free_ws;

    /*  of nxt_unit_request_info_impl_t */
    nxt_queue_t                   active_req;

    /*  of nxt_unit_request_info_impl_t */
    nxt_lvlhsh_t                  requests;

    nxt_unit_mmap_buf_t           ctx_buf[2];

    nxt_unit_request_info_impl_t  req;
};


struct nxt_unit_impl_s {
    nxt_unit_t               unit;
    nxt_unit_callbacks_t     callbacks;

    uint32_t                 request_data_size;

    pthread_mutex_t          mutex;

    nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
    nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */

    nxt_unit_port_id_t       ready_port_id;

    nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */

    pid_t                    pid;
    int                      log_fd;
    int                      online;

    nxt_unit_ctx_impl_t      main_ctx;
};


struct nxt_unit_port_impl_s {
    nxt_unit_port_t          port;

    nxt_queue_link_t         link;
    nxt_unit_process_t       *process;
};


struct nxt_unit_mmap_s {
    nxt_port_mmap_header_t   *hdr;
};


struct nxt_unit_mmaps_s {
    pthread_mutex_t          mutex;
    uint32_t                 size;
    uint32_t                 cap;
    nxt_unit_mmap_t          *elts;
};


struct nxt_unit_process_s {
    pid_t                    pid;

    nxt_queue_t              ports;

    nxt_unit_mmaps_t         incoming;
    nxt_unit_mmaps_t         outgoing;

    nxt_unit_impl_t          *lib;

    nxt_atomic_t             use_count;

    uint32_t                 next_port_id;
};


/* Explicitly using 32 bit types to avoid possible alignment. */
typedef struct {
    int32_t   pid;
    uint32_t  id;
} nxt_unit_port_hash_id_t;


nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
    int              rc;
    uint32_t         ready_stream;
    nxt_unit_ctx_t   *ctx;
    nxt_unit_impl_t  *lib;
    nxt_unit_port_t  ready_port, read_port;

    lib = nxt_unit_create(init);
    if (nxt_slow_path(lib == NULL)) {
        return NULL;
    }

    if (init->ready_port.id.pid != 0
        && init->ready_stream != 0
        && init->read_port.id.pid != 0)
    {
        ready_port = init->ready_port;
        ready_stream = init->ready_stream;
        read_port = init->read_port;
        lib->log_fd = init->log_fd;

        nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
                              ready_port.id.id);
        nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
                              read_port.id.id);
    } else {
        rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
                               &ready_stream);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            goto fail;
        }
    }

    ctx = &lib->main_ctx.ctx;

    rc = lib->callbacks.add_port(ctx, &ready_port);
    if (rc != NXT_UNIT_OK) {
        nxt_unit_alert(NULL, "failed to add ready_port");

        goto fail;
    }

    rc = lib->callbacks.add_port(ctx, &read_port);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_alert(NULL, "failed to add read_port");

        goto fail;
    }

    lib->main_ctx.read_port_id = read_port.id;
    lib->ready_port_id = ready_port.id;

    rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_alert(NULL, "failed to send READY message");

        goto fail;
    }

    return ctx;

fail:

    free(lib);

    return NULL;
}


static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t *init)
{
    int                   rc;
    nxt_unit_impl_t       *lib;
    nxt_unit_callbacks_t  *cb;

    lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
    if (nxt_slow_path(lib == NULL)) {
        nxt_unit_alert(NULL, "failed to allocate unit struct");

        return NULL;
    }

    rc = pthread_mutex_init(&lib->mutex, NULL);
    if (nxt_slow_path(rc != 0)) {
        nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);

        goto fail;
    }

    lib->unit.data = init->data;
    lib->callbacks = init->callbacks;

    lib->request_data_size = init->request_data_size;

    lib->processes.slot = NULL;
    lib->ports.slot = NULL;

    lib->pid = getpid();
    lib->log_fd = STDERR_FILENO;
    lib->online = 1;

    nxt_queue_init(&lib->contexts);

    rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        goto fail;
    }

    cb = &lib->callbacks;

    if (cb->request_handler == NULL) {
        nxt_unit_alert(NULL, "request_handler is NULL");

        goto fail;
    }

    if (cb->add_port == NULL) {
        cb->add_port = nxt_unit_add_port;
    }

    if (cb->remove_port == NULL) {
        cb->remove_port = nxt_unit_remove_port;
    }

    if (cb->remove_pid == NULL) {
        cb->remove_pid = nxt_unit_remove_pid;
    }

    if (cb->quit == NULL) {
        cb->quit = nxt_unit_quit;
    }

    if (cb->port_send == NULL) {
        cb->port_send = nxt_unit_port_send_default;
    }

    if (cb->port_recv == NULL) {
        cb->port_recv = nxt_unit_port_recv_default;
    }

    return lib;

fail:

    free(lib);

    return NULL;
}


static int
nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
    void *data)
{
    int  rc;

    ctx_impl->ctx.data = data;
    ctx_impl->ctx.unit = &lib->unit;

    nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);

    rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
    if (nxt_slow_path(rc != 0)) {
        nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);

        return NXT_UNIT_ERROR;
    }

    nxt_queue_init(&ctx_impl->free_req);
    nxt_queue_init(&ctx_impl->free_ws);
    nxt_queue_init(&ctx_impl->active_req);

    ctx_impl->free_buf = NULL;
    nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
    nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);

    nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);

    ctx_impl->req.req.ctx = &ctx_impl->ctx;
    ctx_impl->req.req.unit = &lib->unit;

    ctx_impl->read_port_fd = -1;
    ctx_impl->requests.slot = 0;

    return NXT_UNIT_OK;
}


nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
    nxt_unit_mmap_buf_t *mmap_buf)
{
    mmap_buf->next = *head;

    if (mmap_buf->next != NULL) {
        mmap_buf->next->prev = &mmap_buf->next;
    }

    *head = mmap_buf;
    mmap_buf->prev = head;
}


nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
    nxt_unit_mmap_buf_t *mmap_buf)
{
    while (*prev != NULL) {
        prev = &(*prev)->next;
    }

    nxt_unit_mmap_buf_insert(prev, mmap_buf);
}


nxt_inline void
nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
{
    nxt_unit_mmap_buf_t  **prev;

    prev = mmap_buf->prev;

    if (mmap_buf->next != NULL) {
        mmap_buf->next->prev = prev;
    }

    if (prev != NULL) {
        *prev = mmap_buf->next;
    }
}


static int
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
    int *log_fd, uint32_t *stream)
{
    int       rc;
    int       ready_fd, read_fd;
    char      *unit_init, *version_end;
    long      version_length;
    int64_t   ready_pid, read_pid;
    uint32_t  ready_stream, ready_id, read_id;

    unit_init = getenv(NXT_UNIT_INIT_ENV);
    if (nxt_slow_path(unit_init == NULL)) {
        nxt_unit_alert(NULL, "%s is not in the current environment",
                       NXT_UNIT_INIT_ENV);

        return NXT_UNIT_ERROR;
    }

    nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);

    version_length = nxt_length(NXT_VERSION);

    version_end = strchr(unit_init, ';');
    if (version_end == NULL
        || version_end - unit_init != version_length
        || memcmp(unit_init, NXT_VERSION, version_length) != 0)
    {
        nxt_unit_alert(NULL, "version check error");

        return NXT_UNIT_ERROR;
    }

    rc = sscanf(version_end + 1,
                "%"PRIu32";"
                "%"PRId64",%"PRIu32",%d;"
                "%"PRId64",%"PRIu32",%d;"
                "%d",
                &ready_stream,
                &ready_pid, &ready_id, &ready_fd,
                &read_pid, &read_id, &read_fd,
                log_fd);

    if (nxt_slow_path(rc != 8)) {
        nxt_unit_alert(NULL, "failed to scan variables");

        return NXT_UNIT_ERROR;
    }

    nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);

    ready_port->in_fd = -1;
    ready_port->out_fd = ready_fd;
    ready_port->data = NULL;

    nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);

    read_port->in_fd = read_fd;
    read_port->out_fd = -1;
    read_port->data = NULL;

    *stream = ready_stream;

    return NXT_UNIT_OK;
}


static int
nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    uint32_t stream)
{
    ssize_t          res;
    nxt_port_msg_t   msg;
    nxt_unit_impl_t  *lib;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    msg.stream = stream;
    msg.pid = lib->pid;
    msg.reply_port = 0;
    msg.type = _NXT_PORT_MSG_PROCESS_READY;
    msg.last = 1;
    msg.mmap = 0;
    msg.nf = 0;
    msg.mf = 0;
    msg.tracking = 0;

    res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
    if (res != sizeof(msg)) {
        return NXT_UNIT_ERROR;
    }

    return NXT_UNIT_OK;
}


int
nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    void *buf, size_t buf_size, void *oob, size_t oob_size)
{
    int                   rc;
    pid_t                 pid;
    struct cmsghdr        *cm;
    nxt_port_msg_t        *port_msg;
    nxt_unit_impl_t       *lib;
    nxt_unit_recv_msg_t   recv_msg;
    nxt_unit_callbacks_t  *cb;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    rc = NXT_UNIT_ERROR;
    recv_msg.fd = -1;
    recv_msg.process = NULL;
    port_msg = buf;
    cm = oob;

    if (oob_size >= CMSG_SPACE(sizeof(int))
        && cm->cmsg_len == CMSG_LEN(sizeof(int))
        && cm->cmsg_level == SOL_SOCKET
        && cm->cmsg_type == SCM_RIGHTS)
    {
        memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
    }

    recv_msg.incoming_buf = NULL;

    if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
        nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
        goto fail;
    }

    recv_msg.stream = port_msg->stream;
    recv_msg.pid = port_msg->pid;
    recv_msg.reply_port = port_msg->reply_port;
    recv_msg.last = port_msg->last;
    recv_msg.mmap = port_msg->mmap;

    recv_msg.start = port_msg + 1;
    recv_msg.size = buf_size - sizeof(nxt_port_msg_t);

    if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
        nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
                      port_msg->stream, (int) port_msg->type);
        goto fail;
    }

    if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
        rc = NXT_UNIT_OK;

        goto fail;
    }

    /* Fragmentation is unsupported. */
    if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
        nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
                      port_msg->stream, (int) port_msg->type);
        goto fail;
    }

    if (port_msg->mmap) {
        if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
            goto fail;
        }
    }

    cb = &lib->callbacks;

    switch (port_msg->type) {

    case _NXT_PORT_MSG_QUIT:
        nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);

        cb->quit(ctx);
        rc = NXT_UNIT_OK;
        break;

    case _NXT_PORT_MSG_NEW_PORT:
        rc = nxt_unit_process_new_port(ctx, &recv_msg);
        break;

    case _NXT_PORT_MSG_CHANGE_FILE:
        nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
                       port_msg->stream, recv_msg.fd);
        break;

    case _NXT_PORT_MSG_MMAP:
        if (nxt_slow_path(recv_msg.fd < 0)) {
            nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
                           port_msg->stream, recv_msg.fd);

            goto fail;
        }

        rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
        break;

    case _NXT_PORT_MSG_REQ_HEADERS:
        rc = nxt_unit_process_req_headers(ctx, &recv_msg);
        break;

    case _NXT_PORT_MSG_WEBSOCKET:
        rc = nxt_unit_process_websocket(ctx, &recv_msg);
        break;

    case _NXT_PORT_MSG_REMOVE_PID:
        if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
            nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
                          "(%d != %d)", port_msg->stream, (int) recv_msg.size,
                          (int) sizeof(pid));

            goto fail;
        }

        memcpy(&pid, recv_msg.start, sizeof(pid));

        nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
                       port_msg->stream, (int) pid);

        cb->remove_pid(ctx, pid);

        rc = NXT_UNIT_OK;
        break;

    default:
        nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
                       port_msg->stream, (int) port_msg->type);

        goto fail;
    }

fail:

    if (recv_msg.fd != -1) {
        close(recv_msg.fd);
    }

    while (recv_msg.incoming_buf != NULL) {
        nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
    }

    if (recv_msg.process != NULL) {
        nxt_unit_process_use(ctx, recv_msg.process, -1);
    }

    return rc;
}


static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    int                      nb;
    nxt_unit_impl_t          *lib;
    nxt_unit_port_t          new_port;
    nxt_port_msg_new_port_t  *new_port_msg;

    if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
        nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
                      "invalid message size (%d)",
                      recv_msg->stream, (int) recv_msg->size);

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(recv_msg->fd < 0)) {
        nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
                       recv_msg->stream, recv_msg->fd);

        return NXT_UNIT_ERROR;
    }

    new_port_msg = recv_msg->start;

    nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
                   recv_msg->stream, (int) new_port_msg->pid,
                   (int) new_port_msg->id, recv_msg->fd);

    nb = 0;

    if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
        nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
                       "failed: %s (%d)", recv_msg->fd, strerror(errno), errno);

        return NXT_UNIT_ERROR;
    }

    nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
                          new_port_msg->id);

    new_port.in_fd = -1;
    new_port.out_fd = recv_msg->fd;
    new_port.data = NULL;

    recv_msg->fd = -1;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    return lib->callbacks.add_port(ctx, &new_port);
}


static int
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    nxt_unit_impl_t               *lib;
    nxt_unit_request_t            *r;
    nxt_unit_mmap_buf_t           *b;
    nxt_unit_request_info_t       *req;
    nxt_unit_request_info_impl_t  *req_impl;

    if (nxt_slow_path(recv_msg->mmap == 0)) {
        nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
                      recv_msg->stream);

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
        nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
                      "%d expected", recv_msg->stream, (int) recv_msg->size,
                      (int) sizeof(nxt_unit_request_t));

        return NXT_UNIT_ERROR;
    }

    req_impl = nxt_unit_request_info_get(ctx);
    if (nxt_slow_path(req_impl == NULL)) {
        nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
                      recv_msg->stream);

        return NXT_UNIT_ERROR;
    }

    req = &req_impl->req;

    nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
                          recv_msg->reply_port);

    req->request = recv_msg->start;

    b = recv_msg->incoming_buf;

    req->request_buf = &b->buf;
    req->response = NULL;
    req->response_buf = NULL;

    r = req->request;

    req->content_length = r->content_length;

    req->content_buf = req->request_buf;
    req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);

    /* "Move" process reference to req_impl. */
    req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
    if (nxt_slow_path(req_impl->process == NULL)) {
        return NXT_UNIT_ERROR;
    }

    recv_msg->process = NULL;

    req_impl->stream = recv_msg->stream;

    req_impl->outgoing_buf = NULL;

    for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
        b->req = req;
    }

    /* "Move" incoming buffer list to req_impl. */
    req_impl->incoming_buf = recv_msg->incoming_buf;
    req_impl->incoming_buf->prev = &req_impl->incoming_buf;
    recv_msg->incoming_buf = NULL;

    req->response_max_fields = 0;
    req_impl->state = NXT_UNIT_RS_START;
    req_impl->websocket = 0;

    nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
                   (int) r->method_length, nxt_unit_sptr_get(&r->method),
                   (int) r->target_length, nxt_unit_sptr_get(&r->target),
                   (int) r->content_length);

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    lib->callbacks.request_handler(req);

    return NXT_UNIT_OK;
}


static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    size_t                           hsize;
    nxt_unit_impl_t                  *lib;
    nxt_unit_mmap_buf_t              *b;
    nxt_unit_ctx_impl_t              *ctx_impl;
    nxt_unit_callbacks_t             *cb;
    nxt_unit_request_info_t          *req;
    nxt_unit_request_info_impl_t     *req_impl;
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
                                          recv_msg->last);
    if (req_impl == NULL) {
        return NXT_UNIT_OK;
    }

    req = &req_impl->req;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
    cb = &lib->callbacks;

    if (cb->websocket_handler && recv_msg->size >= 2) {
        ws_impl = nxt_unit_websocket_frame_get(ctx);
        if (nxt_slow_path(ws_impl == NULL)) {
            nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
                          req_impl->stream);

            return NXT_UNIT_ERROR;
        }

        ws_impl->ws.req = req;

        ws_impl->buf = NULL;
        ws_impl->retain_buf = NULL;

        if (recv_msg->mmap) {
            for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
                b->req = req;
            }

            /* "Move" incoming buffer list to ws_impl. */
            ws_impl->buf = recv_msg->incoming_buf;
            ws_impl->buf->prev = &ws_impl->buf;
            recv_msg->incoming_buf = NULL;

            b = ws_impl->buf;

        } else {
            b = nxt_unit_mmap_buf_get(ctx);
            if (nxt_slow_path(b == NULL)) {
                nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
                               req_impl->stream);

                return NXT_UNIT_ERROR;
            }

            b->hdr = NULL;
            b->req = req;
            b->buf.start = recv_msg->start;
            b->buf.free = b->buf.start;
            b->buf.end = b->buf.start + recv_msg->size;

            nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
        }

        ws_impl->ws.header = (void *) b->buf.start;
        ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
            ws_impl->ws.header);

        hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);

        if (ws_impl->ws.header->mask) {
            ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;

        } else {
            ws_impl->ws.mask = NULL;
        }

        b->buf.free += hsize;

        ws_impl->ws.content_buf = &b->buf;
        ws_impl->ws.content_length = ws_impl->ws.payload_len;

        nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
                           "payload_len=%"PRIu64,
                            ws_impl->ws.header->opcode,
                            ws_impl->ws.payload_len);

        cb->websocket_handler(&ws_impl->ws);
    }

    if (recv_msg->last) {
        req_impl->websocket = 0;

        if (cb->close_handler) {
            nxt_unit_req_debug(req, "close_handler");

            cb->close_handler(req);

        } else {
            nxt_unit_request_done(req, NXT_UNIT_ERROR);
        }
    }

    return NXT_UNIT_OK;
}


static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
{
    nxt_unit_impl_t               *lib;
    nxt_queue_link_t              *lnk;
    nxt_unit_ctx_impl_t           *ctx_impl;
    nxt_unit_request_info_impl_t  *req_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&ctx_impl->mutex);

    if (nxt_queue_is_empty(&ctx_impl->free_req)) {
        pthread_mutex_unlock(&ctx_impl->mutex);

        req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
                          + lib->request_data_size);
        if (nxt_slow_path(req_impl == NULL)) {
            return NULL;
        }

        req_impl->req.unit = ctx->unit;
        req_impl->req.ctx = ctx;

        pthread_mutex_lock(&ctx_impl->mutex);

    } else {
        lnk = nxt_queue_first(&ctx_impl->free_req);
        nxt_queue_remove(lnk);

        req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
    }

    nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);

    pthread_mutex_unlock(&ctx_impl->mutex);

    req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;

    return req_impl;
}


static void
nxt_unit_request_info_release(nxt_unit_request_info_t *req)
{
    nxt_unit_ctx_impl_t           *ctx_impl;
    nxt_unit_request_info_impl_t  *req_impl;

    ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    req->response = NULL;
    req->response_buf = NULL;

    if (req_impl->websocket) {
        nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);

        req_impl->websocket = 0;
    }

    while (req_impl->outgoing_buf != NULL) {
        nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
    }

    while (req_impl->incoming_buf != NULL) {
        nxt_unit_mmap_buf_free(req_impl->incoming_buf);
    }

    /*
     * Process release should go after buffers release to guarantee mmap
     * existence.
     */
    if (req_impl->process != NULL) {
        nxt_unit_process_use(req->ctx, req_impl->process, -1);

        req_impl->process = NULL;
    }

    pthread_mutex_lock(&ctx_impl->mutex);

    nxt_queue_remove(&req_impl->link);

    nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);

    pthread_mutex_unlock(&ctx_impl->mutex);

    req_impl->state = NXT_UNIT_RS_RELEASED;
}


static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
{
    nxt_unit_ctx_impl_t  *ctx_impl;

    ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);

    nxt_queue_remove(&req_impl->link);

    if (req_impl != &ctx_impl->req) {
        free(req_impl);
    }
}


static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
{
    nxt_queue_link_t                 *lnk;
    nxt_unit_ctx_impl_t              *ctx_impl;
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    pthread_mutex_lock(&ctx_impl->mutex);

    if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
        pthread_mutex_unlock(&ctx_impl->mutex);

        ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
        if (nxt_slow_path(ws_impl == NULL)) {
            return NULL;
        }

    } else {
        lnk = nxt_queue_first(&ctx_impl->free_ws);
        nxt_queue_remove(lnk);

        pthread_mutex_unlock(&ctx_impl->mutex);

        ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
    }

    ws_impl->ctx_impl = ctx_impl;

    return ws_impl;
}


static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
{
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);

    while (ws_impl->buf != NULL) {
        nxt_unit_mmap_buf_free(ws_impl->buf);
    }

    ws->req = NULL;

    if (ws_impl->retain_buf != NULL) {
        free(ws_impl->retain_buf);

        ws_impl->retain_buf = NULL;
    }

    pthread_mutex_lock(&ws_impl->ctx_impl->mutex);

    nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);

    pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
}


static void
nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
{
    nxt_queue_remove(&ws_impl->link);

    free(ws_impl);
}


uint16_t
nxt_unit_field_hash(const char *name, size_t name_length)
{
    u_char      ch;
    uint32_t    hash;
    const char  *p, *end;

    hash = 159406; /* Magic value copied from nxt_http_parse.c */
    end = name + name_length;

    for (p = name; p < end; p++) {
        ch = *p;
        hash = (hash << 4) + hash + nxt_lowcase(ch);
    }

    hash = (hash >> 16) ^ hash;

    return hash;
}


void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
{
    uint32_t            i, j;
    nxt_unit_field_t    *fields, f;
    nxt_unit_request_t  *r;

    nxt_unit_req_debug(req, "group_dup_fields");

    r = req->request;
    fields = r->fields;

    for (i = 0; i < r->fields_count; i++) {

        switch (fields[i].hash) {
        case NXT_UNIT_HASH_CONTENT_LENGTH:
            r->content_length_field = i;
            break;

        case NXT_UNIT_HASH_CONTENT_TYPE:
            r->content_type_field = i;
            break;

        case NXT_UNIT_HASH_COOKIE:
            r->cookie_field = i;
            break;
        };

        for (j = i + 1; j < r->fields_count; j++) {
            if (fields[i].hash != fields[j].hash) {
                continue;
            }

            if (j == i + 1) {
                continue;
            }

            f = fields[j];
            f.name.offset += (j - (i + 1)) * sizeof(f);
            f.value.offset += (j - (i + 1)) * sizeof(f);

            while (j > i + 1) {
                fields[j] = fields[j - 1];
                fields[j].name.offset -= sizeof(f);
                fields[j].value.offset -= sizeof(f);
                j--;
            }

            fields[j] = f;

            i++;
        }
    }
}


int
nxt_unit_response_init(nxt_unit_request_info_t *req,
    uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
{
    uint32_t                      buf_size;
    nxt_unit_buf_t                *buf;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "init: response already sent");

        return NXT_UNIT_ERROR;
    }

    nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
                       (int) max_fields_count, (int) max_fields_size);

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_debug(req, "duplicate response init");
    }

    buf_size = sizeof(nxt_unit_response_t)
               + max_fields_count * sizeof(nxt_unit_field_t)
               + max_fields_size;

    if (nxt_slow_path(req->response_buf != NULL)) {
        buf = req->response_buf;

        if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
            goto init_response;
        }

        nxt_unit_buf_free(buf);

        req->response_buf = NULL;
        req->response = NULL;
        req->response_max_fields = 0;

        req_impl->state = NXT_UNIT_RS_START;
    }

    buf = nxt_unit_response_buf_alloc(req, buf_size);
    if (nxt_slow_path(buf == NULL)) {
        return NXT_UNIT_ERROR;
    }

init_response:

    memset(buf->start, 0, sizeof(nxt_unit_response_t));

    req->response_buf = buf;

    req->response = (nxt_unit_response_t *) buf->start;
    req->response->status = status;

    buf->free = buf->start + sizeof(nxt_unit_response_t)
                + max_fields_count * sizeof(nxt_unit_field_t);

    req->response_max_fields = max_fields_count;
    req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;

    return NXT_UNIT_OK;
}


int
nxt_unit_response_realloc(nxt_unit_request_info_t *req,
    uint32_t max_fields_count, uint32_t max_fields_size)
{
    char                          *p;
    uint32_t                      i, buf_size;
    nxt_unit_buf_t                *buf;
    nxt_unit_field_t              *f, *src;
    nxt_unit_response_t           *resp;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "realloc: response not init");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "realloc: response already sent");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
        nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");

        return NXT_UNIT_ERROR;
    }

    buf_size = sizeof(nxt_unit_response_t)
               + max_fields_count * sizeof(nxt_unit_field_t)
               + max_fields_size;

    nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);

    buf = nxt_unit_response_buf_alloc(req, buf_size);
    if (nxt_slow_path(buf == NULL)) {
        nxt_unit_req_warn(req, "realloc: new buf allocation failed");
        return NXT_UNIT_ERROR;
    }

    resp = (nxt_unit_response_t *) buf->start;

    memset(resp, 0, sizeof(nxt_unit_response_t));

    resp->status = req->response->status;
    resp->content_length = req->response->content_length;

    p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
    f = resp->fields;

    for (i = 0; i < req->response->fields_count; i++) {
        src = req->response->fields + i;

        if (nxt_slow_path(src->skip != 0)) {
            continue;
        }

        if (nxt_slow_path(src->name_length + src->value_length + 2
                          > (uint32_t) (buf->end - p)))
        {
            nxt_unit_req_warn(req, "realloc: not enough space for field"
                  " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
                  i, src, src->name_length, src->value_length);

            goto fail;
        }

        nxt_unit_sptr_set(&f->name, p);
        p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
        *p++ = '\0';

        nxt_unit_sptr_set(&f->value, p);
        p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
        *p++ = '\0';

        f->hash = src->hash;
        f->skip = 0;
        f->name_length = src->name_length;
        f->value_length = src->value_length;

        resp->fields_count++;
        f++;
    }

    if (req->response->piggyback_content_length > 0) {
        if (nxt_slow_path(req->response->piggyback_content_length
                          > (uint32_t) (buf->end - p)))
        {
            nxt_unit_req_warn(req, "realloc: not enought space for content"
                  " #%"PRIu32", %"PRIu32" required",
                  i, req->response->piggyback_content_length);

            goto fail;
        }

        resp->piggyback_content_length = req->response->piggyback_content_length;

        nxt_unit_sptr_set(&resp->piggyback_content, p);
        p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
                       req->response->piggyback_content_length);
    }

    buf->free = p;

    nxt_unit_buf_free(req->response_buf);

    req->response = resp;
    req->response_buf = buf;
    req->response_max_fields = max_fields_count;

    return NXT_UNIT_OK;

fail:

    nxt_unit_buf_free(buf);

    return NXT_UNIT_ERROR;
}


int
nxt_unit_response_is_init(nxt_unit_request_info_t *req)
{
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
}


int
nxt_unit_response_add_field(nxt_unit_request_info_t *req,
    const char *name, uint8_t name_length,
    const char *value, uint32_t value_length)
{
    nxt_unit_buf_t                *buf;
    nxt_unit_field_t              *f;
    nxt_unit_response_t           *resp;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "add_field: response not initialized or "
                          "already sent");

        return NXT_UNIT_ERROR;
    }

    resp = req->response;

    if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
        nxt_unit_req_warn(req, "add_field: too many response fields");

        return NXT_UNIT_ERROR;
    }

    buf = req->response_buf;

    if (nxt_slow_path(name_length + value_length + 2
                      > (uint32_t) (buf->end - buf->free)))
    {
        nxt_unit_req_warn(req, "add_field: response buffer overflow");

        return NXT_UNIT_ERROR;
    }

    nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
                       resp->fields_count,
                       (int) name_length, name,
                       (int) value_length, value);

    f = resp->fields + resp->fields_count;

    nxt_unit_sptr_set(&f->name, buf->free);
    buf->free = nxt_cpymem(buf->free, name, name_length);
    *buf->free++ = '\0';

    nxt_unit_sptr_set(&f->value, buf->free);
    buf->free = nxt_cpymem(buf->free, value, value_length);
    *buf->free++ = '\0';

    f->hash = nxt_unit_field_hash(name, name_length);
    f->skip = 0;
    f->name_length = name_length;
    f->value_length = value_length;

    resp->fields_count++;

    return NXT_UNIT_OK;
}


int
nxt_unit_response_add_content(nxt_unit_request_info_t *req,
    const void* src, uint32_t size)
{
    nxt_unit_buf_t                *buf;
    nxt_unit_response_t           *resp;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "add_content: response not initialized yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "add_content: response already sent");

        return NXT_UNIT_ERROR;
    }

    buf = req->response_buf;

    if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
        nxt_unit_req_warn(req, "add_content: buffer overflow");

        return NXT_UNIT_ERROR;
    }

    resp = req->response;

    if (resp->piggyback_content_length == 0) {
        nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
        req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
    }

    resp->piggyback_content_length += size;

    buf->free = nxt_cpymem(buf->free, src, size);

    return NXT_UNIT_OK;
}


int
nxt_unit_response_send(nxt_unit_request_info_t *req)
{
    int                           rc;
    nxt_unit_mmap_buf_t           *mmap_buf;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "send: response is not initialized yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "send: response already sent");

        return NXT_UNIT_ERROR;
    }

    if (req->request->websocket_handshake && req->response->status == 101) {
        nxt_unit_response_upgrade(req);
    }

    nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
                       req->response->fields_count,
                       (int) (req->response_buf->free
                              - req->response_buf->start));

    mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);

    rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
    if (nxt_fast_path(rc == NXT_UNIT_OK)) {
        req->response = NULL;
        req->response_buf = NULL;
        req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;

        nxt_unit_mmap_buf_release(mmap_buf);
    }

    return rc;
}


int
nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
{
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
}


nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
{
    int                           rc;
    nxt_unit_mmap_buf_t           *mmap_buf;
    nxt_unit_request_info_impl_t  *req_impl;

    if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
        nxt_unit_req_warn(req, "response_buf_alloc: "
                          "requested buffer (%"PRIu32") too big", size);

        return NULL;
    }

    nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
    if (nxt_slow_path(mmap_buf == NULL)) {
        nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");

        return NULL;
    }

    mmap_buf->req = req;

    nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);

    rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
                                   &req->response_port, size, mmap_buf);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_mmap_buf_release(mmap_buf);

        return NULL;
    }

    return &mmap_buf->buf;
}


static nxt_unit_process_t *
nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    nxt_unit_impl_t  *lib;

    if (recv_msg->process != NULL) {
        return recv_msg->process;
    }

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);

    pthread_mutex_unlock(&lib->mutex);

    if (recv_msg->process == NULL) {
        nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
                      recv_msg->stream, (int) recv_msg->pid);
    }

    return recv_msg->process;
}


static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
{
    nxt_unit_mmap_buf_t  *mmap_buf;
    nxt_unit_ctx_impl_t  *ctx_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    pthread_mutex_lock(&ctx_impl->mutex);

    if (ctx_impl->free_buf == NULL) {
        pthread_mutex_unlock(&ctx_impl->mutex);

        mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
        if (nxt_slow_path(mmap_buf == NULL)) {
            return NULL;
        }

    } else {
        mmap_buf = ctx_impl->free_buf;

        nxt_unit_mmap_buf_remove(mmap_buf);

        pthread_mutex_unlock(&ctx_impl->mutex);
    }

    mmap_buf->ctx_impl = ctx_impl;

    return mmap_buf;
}


static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
{
    nxt_unit_mmap_buf_remove(mmap_buf);

    pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);

    nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);

    pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
}


typedef struct {
    size_t      len;
    const char  *str;
} nxt_unit_str_t;


#define nxt_unit_str(str)  { nxt_length(str), str }


int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
{
    return req->request->websocket_handshake;
}


int
nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
{
    int                           rc;
    nxt_unit_ctx_impl_t           *ctx_impl;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->websocket != 0)) {
        nxt_unit_req_debug(req, "upgrade: already upgraded");

        return NXT_UNIT_OK;
    }

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "upgrade: response is not initialized yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "upgrade: response already sent");

        return NXT_UNIT_ERROR;
    }

    ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);

    rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_req_warn(req, "upgrade: failed to add request to hash");

        return NXT_UNIT_ERROR;
    }

    req_impl->websocket = 1;

    req->response->status = 101;

    return NXT_UNIT_OK;
}


int
nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
{
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    return req_impl->websocket;
}


nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void *data)
{
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);

    return &req_impl->req;
}


int
nxt_unit_buf_send(nxt_unit_buf_t *buf)
{
    int                           rc;
    nxt_unit_mmap_buf_t           *mmap_buf;
    nxt_unit_request_info_t       *req;
    nxt_unit_request_info_impl_t  *req_impl;

    mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);

    req = mmap_buf->req;
    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    nxt_unit_req_debug(req, "buf_send: %d bytes",
                       (int) (buf->free - buf->start));

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "buf_send: response not initialized yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "buf_send: headers not sent yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_fast_path(buf->free > buf->start)) {
        rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return rc;
        }
    }

    nxt_unit_mmap_buf_release(mmap_buf);

    return NXT_UNIT_OK;
}


static void
nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
{
    int                           rc;
    nxt_unit_mmap_buf_t           *mmap_buf;
    nxt_unit_request_info_t       *req;
    nxt_unit_request_info_impl_t  *req_impl;

    mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);

    req = mmap_buf->req;
    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
    if (nxt_slow_path(rc == NXT_UNIT_OK)) {
        nxt_unit_mmap_buf_release(mmap_buf);

        nxt_unit_request_info_release(req);

    } else {
        nxt_unit_request_done(req, rc);
    }
}


static int
nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
    nxt_unit_mmap_buf_t *mmap_buf, int last)
{
    struct {
        nxt_port_msg_t       msg;
        nxt_port_mmap_msg_t  mmap_msg;
    } m;

    u_char                   *end, *last_used, *first_free;
    ssize_t                  res;
    nxt_chunk_id_t           first_free_chunk;
    nxt_unit_buf_t           *buf;
    nxt_unit_impl_t          *lib;
    nxt_port_mmap_header_t   *hdr;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    buf = &mmap_buf->buf;
    hdr = mmap_buf->hdr;

    m.mmap_msg.size = buf->free - buf->start;

    m.msg.stream = stream;
    m.msg.pid = lib->pid;
    m.msg.reply_port = 0;
    m.msg.type = _NXT_PORT_MSG_DATA;
    m.msg.last = last != 0;
    m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
    m.msg.nf = 0;
    m.msg.mf = 0;
    m.msg.tracking = 0;

    if (hdr != NULL) {
        m.mmap_msg.mmap_id = hdr->id;
        m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
    }

    nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
                   stream,
                   (int) m.mmap_msg.mmap_id,
                   (int) m.mmap_msg.chunk_id,
                   (int) m.mmap_msg.size);

    res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
                                   m.msg.mmap ? sizeof(m) : sizeof(m.msg),
                                   NULL, 0);
    if (nxt_slow_path(res != sizeof(m))) {
        return NXT_UNIT_ERROR;
    }

    if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
        last_used = (u_char *) buf->free - 1;

        first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
        first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
        end = (u_char *) buf->end;

        nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));

        buf->end = (char *) first_free;
    }

    return NXT_UNIT_OK;
}


void
nxt_unit_buf_free(nxt_unit_buf_t *buf)
{
    nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
}


static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
{
    if (nxt_fast_path(mmap_buf->hdr != NULL)) {
        nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
                              mmap_buf->buf.end - mmap_buf->buf.start);
    }

    nxt_unit_mmap_buf_release(mmap_buf);
}


nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t *buf)
{
    nxt_unit_mmap_buf_t  *mmap_buf;

    mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);

    if (mmap_buf->next == NULL) {
        return NULL;
    }

    return &mmap_buf->next->buf;
}


uint32_t
nxt_unit_buf_max(void)
{
    return PORT_MMAP_DATA_SIZE;
}


uint32_t
nxt_unit_buf_min(void)
{
    return PORT_MMAP_CHUNK_SIZE;
}


int
nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
    size_t size)
{
    int                           rc;
    uint32_t                      part_size;
    const char                    *part_start;
    nxt_unit_mmap_buf_t           mmap_buf;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    part_start = start;

    /* Check if response is not send yet. */
    if (nxt_slow_path(req->response_buf)) {
        part_size = req->response_buf->end - req->response_buf->free;
        part_size = nxt_min(size, part_size);

        rc = nxt_unit_response_add_content(req, part_start, part_size);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return rc;
        }

        rc = nxt_unit_response_send(req);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return rc;
        }

        size -= part_size;
        part_start += part_size;
    }

    while (size > 0) {
        part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);

        rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
                                       &req->response_port, part_size,
                                       &mmap_buf);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return rc;
        }

        mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
                                       part_start, part_size);

        rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
                                  mmap_buf.buf.end - mmap_buf.buf.start);

            return rc;
        }

        size -= part_size;
        part_start += part_size;
    }

    return NXT_UNIT_OK;
}


int
nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
    nxt_unit_read_info_t *read_info)
{
    int             rc;
    ssize_t         n;
    nxt_unit_buf_t  *buf;

    /* Check if response is not send yet. */
    if (nxt_slow_path(req->response_buf)) {

        /* Enable content in headers buf. */
        rc = nxt_unit_response_add_content(req, "", 0);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            nxt_unit_req_error(req, "Failed to add piggyback content");

            return rc;
        }

        buf = req->response_buf;

        while (buf->end - buf->free > 0) {
            n = read_info->read(read_info, buf->free, buf->end - buf->free);
            if (nxt_slow_path(n < 0)) {
                nxt_unit_req_error(req, "Read error");

                return NXT_UNIT_ERROR;
            }

            /* Manually increase sizes. */
            buf->free += n;
            req->response->piggyback_content_length += n;

            if (read_info->eof) {
                break;
            }
        }

        rc = nxt_unit_response_send(req);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            nxt_unit_req_error(req, "Failed to send headers with content");

            return rc;
        }

        if (read_info->eof) {
            return NXT_UNIT_OK;
        }
    }

    while (!read_info->eof) {
        nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
                           read_info->buf_size);

        buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size,
                                                       PORT_MMAP_DATA_SIZE));
        if (nxt_slow_path(buf == NULL)) {
            nxt_unit_req_error(req, "Failed to allocate buf for content");

            return NXT_UNIT_ERROR;
        }

        while (!read_info->eof && buf->end > buf->free) {
            n = read_info->read(read_info, buf->free, buf->end - buf->free);
            if (nxt_slow_path(n < 0)) {
                nxt_unit_req_error(req, "Read error");

                nxt_unit_buf_free(buf);

                return NXT_UNIT_ERROR;
            }

            buf->free += n;
        }

        rc = nxt_unit_buf_send(buf);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            nxt_unit_req_error(req, "Failed to send content");

            return rc;
        }
    }

    return NXT_UNIT_OK;
}


ssize_t
nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
{
    return nxt_unit_buf_read(&req->content_buf, &req->content_length,
                             dst, size);
}


static ssize_t
nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
{
    u_char          *p;
    size_t          rest, copy, read;
    nxt_unit_buf_t  *buf;

    p = dst;
    rest = size;

    buf = *b;

    while (buf != NULL) {
        copy = buf->end - buf->free;
        copy = nxt_min(rest, copy);

        p = nxt_cpymem(p, buf->free, copy);

        buf->free += copy;
        rest -= copy;

        if (rest == 0) {
            if (buf->end == buf->free) {
                buf = nxt_unit_buf_next(buf);
            }

            break;
        }

        buf = nxt_unit_buf_next(buf);
    }

    *b = buf;

    read = size - rest;

    *len -= read;

    return read;
}


void
nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
{
    ssize_t                       res;
    uint32_t                      size;
    nxt_port_msg_t                msg;
    nxt_unit_impl_t               *lib;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    nxt_unit_req_debug(req, "done: %d", rc);

    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        goto skip_response_send;
    }

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {

        size = nxt_length("Content-Type") + nxt_length("text/plain");

        rc = nxt_unit_response_init(req, 200, 1, size);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            goto skip_response_send;
        }

        rc = nxt_unit_response_add_field(req, "Content-Type",
                                   nxt_length("Content-Type"),
                                   "text/plain", nxt_length("text/plain"));
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            goto skip_response_send;
        }
    }

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {

        req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;

        nxt_unit_buf_send_done(req->response_buf);

        return;
    }

skip_response_send:

    lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);

    msg.stream = req_impl->stream;
    msg.pid = lib->pid;
    msg.reply_port = 0;
    msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
                                   : _NXT_PORT_MSG_RPC_ERROR;
    msg.last = 1;
    msg.mmap = 0;
    msg.nf = 0;
    msg.mf = 0;
    msg.tracking = 0;

    res = lib->callbacks.port_send(req->ctx, &req->response_port,
                                   &msg, sizeof(msg), NULL, 0);
    if (nxt_slow_path(res != sizeof(msg))) {
        nxt_unit_req_alert(req, "last message send failed: %s (%d)",
                           strerror(errno), errno);
    }

    nxt_unit_request_info_release(req);
}


int
nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
    uint8_t last, const void *start, size_t size)
{
    const struct iovec  iov = { (void *) start, size };

    return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
}


int
nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
    uint8_t last, const struct iovec *iov, int iovcnt)
{
    int                     i, rc;
    size_t                  l, copy;
    uint32_t                payload_len, buf_size;
    const uint8_t           *b;
    nxt_unit_buf_t          *buf;
    nxt_websocket_header_t  *wh;

    payload_len = 0;

    for (i = 0; i < iovcnt; i++) {
        payload_len += iov[i].iov_len;
    }

    buf_size = 10 + payload_len;

    buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
                                                   PORT_MMAP_DATA_SIZE));
    if (nxt_slow_path(buf == NULL)) {
        nxt_unit_req_error(req, "Failed to allocate buf for content");

        return NXT_UNIT_ERROR;
    }

    buf->start[0] = 0;
    buf->start[1] = 0;

    wh = (void *) buf->free;

    buf->free = nxt_websocket_frame_init(wh, payload_len);
    wh->fin = last;
    wh->opcode = opcode;

    for (i = 0; i < iovcnt; i++) {
        b = iov[i].iov_base;
        l = iov[i].iov_len;

        while (l > 0) {
            copy = buf->end - buf->free;
            copy = nxt_min(l, copy);

            buf->free = nxt_cpymem(buf->free, b, copy);
            b += copy;
            l -= copy;

            if (l > 0) {
                buf_size -= buf->end - buf->start;

                rc = nxt_unit_buf_send(buf);
                if (nxt_slow_path(rc != NXT_UNIT_OK)) {
                    nxt_unit_req_error(req, "Failed to send content");

                    return NXT_UNIT_ERROR;
                }

                buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
                                                          PORT_MMAP_DATA_SIZE));
                if (nxt_slow_path(buf == NULL)) {
                    nxt_unit_req_error(req,
                                       "Failed to allocate buf for content");

                    return NXT_UNIT_ERROR;
                }
            }
        }
    }

    if (buf->free > buf->start) {
        rc = nxt_unit_buf_send(buf);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            nxt_unit_req_error(req, "Failed to send content");
        }
    }

    return rc;
}


ssize_t
nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
    size_t size)
{
    ssize_t   res;
    uint8_t   *b;
    uint64_t  i, d;

    res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
                            dst, size);

    if (ws->mask == NULL) {
        return res;
    }

    b = dst;
    d = (ws->payload_len - ws->content_length - res) % 4;

    for (i = 0; i < (uint64_t) res; i++) {
        b[i] ^= ws->mask[ (i + d) % 4 ];
    }

    return res;
}


int
nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
{
    char                             *b;
    size_t                           size;
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);

    if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
        return NXT_UNIT_OK;
    }

    size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;

    b = malloc(size);
    if (nxt_slow_path(b == NULL)) {
        return NXT_UNIT_ERROR;
    }

    memcpy(b, ws_impl->buf->buf.start, size);

    ws_impl->buf->buf.start = b;
    ws_impl->buf->buf.free = b;
    ws_impl->buf->buf.end = b + size;

    ws_impl->retain_buf = b;

    return NXT_UNIT_OK;
}


void
nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
{
    nxt_unit_websocket_frame_release(ws);
}


static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
    nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
{
    int                     res, nchunks, i;
    nxt_unit_mmap_t         *mm, *mm_end;
    nxt_port_mmap_header_t  *hdr;

    pthread_mutex_lock(&process->outgoing.mutex);

    mm_end = process->outgoing.elts + process->outgoing.size;

    for (mm = process->outgoing.elts; mm < mm_end; mm++) {
        hdr = mm->hdr;

        if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
            continue;
        }

        *c = 0;

        while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
            nchunks = 1;

            while (nchunks < n) {
                res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
                                                       *c + nchunks);

                if (res == 0) {
                    for (i = 0; i < nchunks; i++) {
                        nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
                    }

                    *c += nchunks + 1;
                    nchunks = 0;
                    break;
                }

                nchunks++;
            }

            if (nchunks == n) {
                goto unlock;
            }
        }
    }

    *c = 0;
    hdr = nxt_unit_new_mmap(ctx, process, port_id, n);

unlock:

    pthread_mutex_unlock(&process->outgoing.mutex);

    return hdr;
}


static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
    uint32_t  cap;

    cap = mmaps->cap;

    if (cap == 0) {
        cap = i + 1;
    }

    while (i + 1 > cap) {

        if (cap < 16) {
            cap = cap * 2;

        } else {
            cap = cap + cap / 2;
        }
    }

    if (cap != mmaps->cap) {

        mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
        if (nxt_slow_path(mmaps->elts == NULL)) {
            return NULL;
        }

        memset(mmaps->elts + mmaps->cap, 0,
               sizeof(*mmaps->elts) * (cap - mmaps->cap));

        mmaps->cap = cap;
    }

    if (i + 1 > mmaps->size) {
        mmaps->size = i + 1;
    }

    return mmaps->elts + i;
}


static nxt_port_mmap_header_t *
nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
    nxt_unit_port_id_t *port_id, int n)
{
    int                     i, fd, rc;
    void                    *mem;
    char                    name[64];
    nxt_unit_mmap_t         *mm;
    nxt_unit_impl_t         *lib;
    nxt_port_mmap_header_t  *hdr;

    lib = process->lib;

    mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
    if (nxt_slow_path(mm == NULL)) {
        nxt_unit_warn(ctx, "failed to add mmap to outgoing array");

        return NULL;
    }

    snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
             lib->pid, (void *) pthread_self());

#if (NXT_HAVE_MEMFD_CREATE)

    fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
    if (nxt_slow_path(fd == -1)) {
        nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
                       strerror(errno), errno);

        goto remove_fail;
    }

    nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);

#elif (NXT_HAVE_SHM_OPEN_ANON)

    fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
    if (nxt_slow_path(fd == -1)) {
        nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
                       strerror(errno), errno);

        goto remove_fail;
    }

#elif (NXT_HAVE_SHM_OPEN)

    /* Just in case. */
    shm_unlink(name);

    fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
    if (nxt_slow_path(fd == -1)) {
        nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
                       strerror(errno), errno);

        goto remove_fail;
    }

    if (nxt_slow_path(shm_unlink(name) == -1)) {
        nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
                      strerror(errno), errno);
    }

#else

#error No working shared memory implementation.

#endif

    if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
        nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
                       strerror(errno), errno);

        goto remove_fail;
    }

    mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    if (nxt_slow_path(mem == MAP_FAILED)) {
        nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
                       strerror(errno), errno);

        goto remove_fail;
    }

    mm->hdr = mem;
    hdr = mem;

    memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
    memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));

    hdr->id = process->outgoing.size - 1;
    hdr->src_pid = lib->pid;
    hdr->dst_pid = process->pid;
    hdr->sent_over = port_id->id;

    /* Mark first n chunk(s) as busy */
    for (i = 0; i < n; i++) {
        nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
    }

    /* Mark as busy chunk followed the last available chunk. */
    nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
    nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);

    pthread_mutex_unlock(&process->outgoing.mutex);

    rc = nxt_unit_send_mmap(ctx, port_id, fd);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        munmap(mem, PORT_MMAP_SIZE);
        hdr = NULL;

    } else {
        nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
                       hdr->id, (int) lib->pid, (int) process->pid);
    }

    close(fd);

    pthread_mutex_lock(&process->outgoing.mutex);

    if (nxt_fast_path(hdr != NULL)) {
        return hdr;
    }

remove_fail:

    process->outgoing.size--;

    return NULL;
}


static int
nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
{
    ssize_t          res;
    nxt_port_msg_t   msg;
    nxt_unit_impl_t  *lib;
    union {
        struct cmsghdr  cm;
        char            space[CMSG_SPACE(sizeof(int))];
    } cmsg;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    msg.stream = 0;
    msg.pid = lib->pid;
    msg.reply_port = 0;
    msg.type = _NXT_PORT_MSG_MMAP;
    msg.last = 0;
    msg.mmap = 0;
    msg.nf = 0;
    msg.mf = 0;
    msg.tracking = 0;

    /*
     * Fill all padding fields with 0.
     * Code in Go 1.11 validate cmsghdr using padding field as part of len.
     * See Cmsghdr definition and socketControlMessageHeaderAndData function.
     */
    memset(&cmsg, 0, sizeof(cmsg));

    cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
    cmsg.cm.cmsg_level = SOL_SOCKET;
    cmsg.cm.cmsg_type = SCM_RIGHTS;

    /*
     * memcpy() is used instead of simple
     *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
     * because GCC 4.4 with -O2/3/s optimization may issue a warning:
     *   dereferencing type-punned pointer will break strict-aliasing rules
     *
     * Fortunately, GCC with -O1 compiles this nxt_memcpy()
     * in the same simple assignment as in the code above.
     */
    memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));

    res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
                                   &cmsg, sizeof(cmsg));
    if (nxt_slow_path(res != sizeof(msg))) {
        nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
                      (int) port_id->pid, strerror(errno), errno);

        return NXT_UNIT_ERROR;
    }

    return NXT_UNIT_OK;
}


static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
    nxt_unit_port_id_t *port_id, uint32_t size,
    nxt_unit_mmap_buf_t *mmap_buf)
{
    uint32_t                nchunks;
    nxt_chunk_id_t          c;
    nxt_port_mmap_header_t  *hdr;

    nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;

    hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
    if (nxt_slow_path(hdr == NULL)) {
        return NXT_UNIT_ERROR;
    }

    mmap_buf->hdr = hdr;
    mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
    mmap_buf->buf.free = mmap_buf->buf.start;
    mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
    mmap_buf->port_id = *port_id;

    nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
                  (int) hdr->id, (int) c,
                  (int) (nchunks * PORT_MMAP_CHUNK_SIZE));

    return NXT_UNIT_OK;
}


static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
{
    int                      rc;
    void                     *mem;
    struct stat              mmap_stat;
    nxt_unit_mmap_t          *mm;
    nxt_unit_impl_t          *lib;
    nxt_unit_process_t       *process;
    nxt_port_mmap_header_t   *hdr;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);

    pthread_mutex_lock(&lib->mutex);

    process = nxt_unit_process_find(ctx, pid, 0);

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(process == NULL)) {
        nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
                      (int) pid, fd);

        return NXT_UNIT_ERROR;
    }

    rc = NXT_UNIT_ERROR;

    if (fstat(fd, &mmap_stat) == -1) {
        nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
                      strerror(errno), errno);

        goto fail;
    }

    mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
               MAP_SHARED, fd, 0);
    if (nxt_slow_path(mem == MAP_FAILED)) {
        nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
                      strerror(errno), errno);

        goto fail;
    }

    hdr = mem;

    if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {

        nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
                      "detected: %d != %d or %d != %d", (int) hdr->src_pid,
                      (int) pid, (int) hdr->dst_pid, (int) lib->pid);

        munmap(mem, PORT_MMAP_SIZE);

        goto fail;
    }

    pthread_mutex_lock(&process->incoming.mutex);

    mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
    if (nxt_slow_path(mm == NULL)) {
        nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");

        munmap(mem, PORT_MMAP_SIZE);

    } else {
        mm->hdr = hdr;

        hdr->sent_over = 0xFFFFu;

        rc = NXT_UNIT_OK;
    }

    pthread_mutex_unlock(&process->incoming.mutex);

fail:

    nxt_unit_process_use(ctx, process, -1);

    return rc;
}


static void
nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
{
    pthread_mutex_init(&mmaps->mutex, NULL);

    mmaps->size = 0;
    mmaps->cap = 0;
    mmaps->elts = NULL;
}


static void
nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
{
    long c;

    c = nxt_atomic_fetch_add(&process->use_count, i);

    if (i < 0 && c == -i) {
        nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);

        nxt_unit_mmaps_destroy(&process->incoming);
        nxt_unit_mmaps_destroy(&process->outgoing);

        free(process);
    }
}


static void
nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
{
    nxt_unit_mmap_t  *mm, *end;

    if (mmaps->elts != NULL) {
        end = mmaps->elts + mmaps->size;

        for (mm = mmaps->elts; mm < end; mm++) {
            munmap(mm->hdr, PORT_MMAP_SIZE);
        }

        free(mmaps->elts);
    }

    pthread_mutex_destroy(&mmaps->mutex);
}


static nxt_port_mmap_header_t *
nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
    uint32_t id)
{
    nxt_port_mmap_header_t  *hdr;

    if (nxt_fast_path(process->incoming.size > id)) {
        hdr = process->incoming.elts[id].hdr;

    } else {
        hdr = NULL;
    }

    return hdr;
}


static int
nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    int                           rc;
    nxt_chunk_id_t                c;
    nxt_unit_process_t            *process;
    nxt_port_mmap_header_t        *hdr;
    nxt_port_mmap_tracking_msg_t  *tracking_msg;

    if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
        nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
                      recv_msg->stream, (int) recv_msg->size);

        return 0;
    }

    tracking_msg = recv_msg->start;

    recv_msg->start = tracking_msg + 1;
    recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);

    process = nxt_unit_msg_get_process(ctx, recv_msg);
    if (nxt_slow_path(process == NULL)) {
        return 0;
    }

    pthread_mutex_lock(&process->incoming.mutex);

    hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
    if (nxt_slow_path(hdr == NULL)) {
        pthread_mutex_unlock(&process->incoming.mutex);

        nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
                      "invalid mmap id %d,%"PRIu32,
                      recv_msg->stream, (int) process->pid,
                      tracking_msg->mmap_id);

        return 0;
    }

    c = tracking_msg->tracking_id;
    rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);

    if (rc == 0) {
        nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
                       recv_msg->stream);

        nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
    }

    pthread_mutex_unlock(&process->incoming.mutex);

    return rc;
}


static int
nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    void                    *start;
    uint32_t                size;
    nxt_unit_process_t      *process;
    nxt_unit_mmap_buf_t     *b, **incoming_tail;
    nxt_port_mmap_msg_t     *mmap_msg, *end;
    nxt_port_mmap_header_t  *hdr;

    if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
        nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
                      recv_msg->stream, (int) recv_msg->size);

        return NXT_UNIT_ERROR;
    }

    process = nxt_unit_msg_get_process(ctx, recv_msg);
    if (nxt_slow_path(process == NULL)) {
        return NXT_UNIT_ERROR;
    }

    mmap_msg = recv_msg->start;
    end = nxt_pointer_to(recv_msg->start, recv_msg->size);

    incoming_tail = &recv_msg->incoming_buf;

    pthread_mutex_lock(&process->incoming.mutex);

    for (; mmap_msg < end; mmap_msg++) {
        hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
        if (nxt_slow_path(hdr == NULL)) {
            pthread_mutex_unlock(&process->incoming.mutex);

            nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
                          "invalid mmap id %d,%"PRIu32,
                          recv_msg->stream, (int) process->pid,
                          mmap_msg->mmap_id);

            return NXT_UNIT_ERROR;
        }

        start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
        size = mmap_msg->size;

        if (recv_msg->start == mmap_msg) {
            recv_msg->start = start;
            recv_msg->size = size;
        }

        b = nxt_unit_mmap_buf_get(ctx);
        if (nxt_slow_path(b == NULL)) {
            pthread_mutex_unlock(&process->incoming.mutex);

            nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
                          recv_msg->stream);

            nxt_unit_mmap_release(hdr, start, size);

            return NXT_UNIT_ERROR;
        }

        nxt_unit_mmap_buf_insert(incoming_tail, b);
        incoming_tail = &b->next;

        b->buf.start = start;
        b->buf.free = start;
        b->buf.end = b->buf.start + size;
        b->hdr = hdr;

        nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
                       recv_msg->stream,
                       start, (int) size,
                       (int) hdr->src_pid, (int) hdr->dst_pid,
                       (int) hdr->id, (int) mmap_msg->chunk_id,
                       (int) mmap_msg->size);
    }

    pthread_mutex_unlock(&process->incoming.mutex);

    return NXT_UNIT_OK;
}


static int
nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size)
{
    u_char          *p, *end;
    nxt_chunk_id_t  c;

    memset(start, 0xA5, size);

    p = start;
    end = p + size;
    c = nxt_port_mmap_chunk_id(hdr, p);

    while (p < end) {
        nxt_port_mmap_set_chunk_free(hdr->free_map, c);

        p += PORT_MMAP_CHUNK_SIZE;
        c++;
    }

    return NXT_UNIT_OK;
}


static nxt_int_t
nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
{
    nxt_process_t  *process;

    process = data;

    if (lhq->key.length == sizeof(pid_t)
        && *(pid_t *) lhq->key.start == process->pid)
    {
        return NXT_OK;
    }

    return NXT_DECLINED;
}


static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
    NXT_LVLHSH_DEFAULT,
    nxt_unit_lvlhsh_pid_test,
    nxt_lvlhsh_alloc,
    nxt_lvlhsh_free,
};


static inline void
nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
{
    lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
    lhq->key.length = sizeof(*pid);
    lhq->key.start = (u_char *) pid;
    lhq->proto = &lvlhsh_processes_proto;
}


static nxt_unit_process_t *
nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
{
    nxt_unit_impl_t     *lib;
    nxt_unit_process_t  *process;
    nxt_lvlhsh_query_t  lhq;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_unit_process_lhq_pid(&lhq, &pid);

    if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
        process = lhq.value;
        nxt_unit_process_use(ctx, process, 1);

        return process;
    }

    process = malloc(sizeof(nxt_unit_process_t));
    if (nxt_slow_path(process == NULL)) {
        nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);

        return NULL;
    }

    process->pid = pid;
    process->use_count = 1;
    process->next_port_id = 0;
    process->lib = lib;

    nxt_queue_init(&process->ports);

    nxt_unit_mmaps_init(&process->incoming);
    nxt_unit_mmaps_init(&process->outgoing);

    lhq.replace = 0;
    lhq.value = process;

    switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {

    case NXT_OK:
        break;

    default:
        nxt_unit_warn(ctx, "process %d insert failed", (int) pid);

        pthread_mutex_destroy(&process->outgoing.mutex);
        pthread_mutex_destroy(&process->incoming.mutex);
        free(process);
        process = NULL;
        break;
    }

    nxt_unit_process_use(ctx, process, 1);

    return process;
}


static nxt_unit_process_t *
nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
{
    int                 rc;
    nxt_unit_impl_t     *lib;
    nxt_unit_process_t  *process;
    nxt_lvlhsh_query_t  lhq;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_unit_process_lhq_pid(&lhq, &pid);

    if (remove) {
        rc = nxt_lvlhsh_delete(&lib->processes, &lhq);

    } else {
        rc = nxt_lvlhsh_find(&lib->processes, &lhq);
    }

    if (rc == NXT_OK) {
        process = lhq.value;

        if (!remove) {
            nxt_unit_process_use(ctx, process, 1);
        }

        return process;
    }

    return NULL;
}


static nxt_unit_process_t *
nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
{
    return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
}


int
nxt_unit_run(nxt_unit_ctx_t *ctx)
{
    int              rc;
    nxt_unit_impl_t  *lib;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
    rc = NXT_UNIT_OK;

    while (nxt_fast_path(lib->online)) {
        rc = nxt_unit_run_once(ctx);
    }

    return rc;
}


int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
    int                  rc;
    char                 buf[4096];
    char                 oob[256];
    ssize_t              rsize;
    nxt_unit_impl_t      *lib;
    nxt_unit_ctx_impl_t  *ctx_impl;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    memset(oob, 0, sizeof(struct cmsghdr));

    if (ctx_impl->read_port_fd != -1) {
        rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
                                         buf, sizeof(buf),
                                         oob, sizeof(oob));
    } else {
        rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
                                         buf, sizeof(buf),
                                         oob, sizeof(oob));
    }

    if (nxt_fast_path(rsize > 0)) {
        rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
                                  oob, sizeof(oob));

#if (NXT_DEBUG)
        memset(buf, 0xAC, rsize);
#endif

    } else {
        rc = NXT_UNIT_ERROR;
    }

    return rc;
}


void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{
    nxt_unit_impl_t      *lib;
    nxt_unit_process_t   *process;
    nxt_unit_ctx_impl_t  *ctx_impl;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {

        nxt_unit_ctx_free(&ctx_impl->ctx);

    } nxt_queue_loop;

    for ( ;; ) {
        pthread_mutex_lock(&lib->mutex);

        process = nxt_unit_process_pop_first(lib);
        if (process == NULL) {
            pthread_mutex_unlock(&lib->mutex);

            break;
        }

        nxt_unit_remove_process(ctx, process);
    }

    pthread_mutex_destroy(&lib->mutex);

    free(lib);
}


nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
{
    int                  rc, fd;
    nxt_unit_impl_t      *lib;
    nxt_unit_port_id_t   new_port_id;
    nxt_unit_ctx_impl_t  *new_ctx;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
    if (nxt_slow_path(new_ctx == NULL)) {
        nxt_unit_warn(ctx, "failed to allocate context");

        return NULL;
    }

    rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        free(new_ctx);

        return NULL;
    }

    rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        lib->callbacks.remove_port(ctx, &new_port_id);

        close(fd);

        free(new_ctx);

        return NULL;
    }

    close(fd);

    rc = nxt_unit_ctx_init(lib, new_ctx, data);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        lib->callbacks.remove_port(ctx, &new_port_id);

        free(new_ctx);

        return NULL;
    }

    new_ctx->read_port_id = new_port_id;

    return &new_ctx->ctx;
}


void
nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
{
    nxt_unit_impl_t                  *lib;
    nxt_unit_ctx_impl_t              *ctx_impl;
    nxt_unit_mmap_buf_t              *mmap_buf;
    nxt_unit_request_info_impl_t     *req_impl;
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_queue_each(req_impl, &ctx_impl->active_req,
                   nxt_unit_request_info_impl_t, link)
    {
        nxt_unit_req_warn(&req_impl->req, "active request on ctx free");

        nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);

    } nxt_queue_loop;

    nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]);
    nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]);

    while (ctx_impl->free_buf != NULL) {
        mmap_buf = ctx_impl->free_buf;
        nxt_unit_mmap_buf_remove(mmap_buf);
        free(mmap_buf);
    }

    nxt_queue_each(req_impl, &ctx_impl->free_req,
                   nxt_unit_request_info_impl_t, link)
    {
        nxt_unit_request_info_free(req_impl);

    } nxt_queue_loop;

    nxt_queue_each(ws_impl, &ctx_impl->free_ws,
                   nxt_unit_websocket_frame_impl_t, link)
    {
        nxt_unit_websocket_frame_free(ws_impl);

    } nxt_queue_loop;

    pthread_mutex_destroy(&ctx_impl->mutex);

    nxt_queue_remove(&ctx_impl->link);

    if (ctx_impl != &lib->main_ctx) {
        free(ctx_impl);
    }
}


/* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
#if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
#define NXT_UNIX_SOCKET  SOCK_SEQPACKET
#else
#define NXT_UNIX_SOCKET  SOCK_DGRAM
#endif


void
nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
{
    nxt_unit_port_hash_id_t  port_hash_id;

    port_hash_id.pid = pid;
    port_hash_id.id = id;

    port_id->pid = pid;
    port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
    port_id->id = id;
}


int
nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
    nxt_unit_port_id_t *port_id)
{
    int                 rc, fd;
    nxt_unit_impl_t     *lib;
    nxt_unit_port_id_t  new_port_id;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        return rc;
    }

    rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);

    if (nxt_fast_path(rc == NXT_UNIT_OK)) {
        *port_id = new_port_id;

    } else {
        lib->callbacks.remove_port(ctx, &new_port_id);
    }

    close(fd);

    return rc;
}


static int
nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
{
    int                 rc, port_sockets[2];
    nxt_unit_impl_t     *lib;
    nxt_unit_port_t     new_port;
    nxt_unit_process_t  *process;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
    if (nxt_slow_path(rc != 0)) {
        nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
                      strerror(errno), errno);

        return NXT_UNIT_ERROR;
    }

    nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
                   port_sockets[0], port_sockets[1]);

    pthread_mutex_lock(&lib->mutex);

    process = nxt_unit_process_get(ctx, lib->pid);
    if (nxt_slow_path(process == NULL)) {
        pthread_mutex_unlock(&lib->mutex);

        close(port_sockets[0]);
        close(port_sockets[1]);

        return NXT_UNIT_ERROR;
    }

    nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);

    new_port.in_fd = port_sockets[0];
    new_port.out_fd = -1;
    new_port.data = NULL;

    pthread_mutex_unlock(&lib->mutex);

    nxt_unit_process_use(ctx, process, -1);

    rc = lib->callbacks.add_port(ctx, &new_port);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_warn(ctx, "create_port: add_port() failed");

        close(port_sockets[0]);
        close(port_sockets[1]);

        return rc;
    }

    *port_id = new_port.id;
    *fd = port_sockets[1];

    return rc;
}


static int
nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
    nxt_unit_port_id_t *new_port, int fd)
{
    ssize_t          res;
    nxt_unit_impl_t  *lib;

    struct {
        nxt_port_msg_t            msg;
        nxt_port_msg_new_port_t   new_port;
    } m;

    union {
        struct cmsghdr  cm;
        char            space[CMSG_SPACE(sizeof(int))];
    } cmsg;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    m.msg.stream = 0;
    m.msg.pid = lib->pid;
    m.msg.reply_port = 0;
    m.msg.type = _NXT_PORT_MSG_NEW_PORT;
    m.msg.last = 0;
    m.msg.mmap = 0;
    m.msg.nf = 0;
    m.msg.mf = 0;
    m.msg.tracking = 0;

    m.new_port.id = new_port->id;
    m.new_port.pid = new_port->pid;
    m.new_port.type = NXT_PROCESS_WORKER;
    m.new_port.max_size = 16 * 1024;
    m.new_port.max_share = 64 * 1024;

    memset(&cmsg, 0, sizeof(cmsg));

    cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
    cmsg.cm.cmsg_level = SOL_SOCKET;
    cmsg.cm.cmsg_type = SCM_RIGHTS;

    /*
     * memcpy() is used instead of simple
     *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
     * because GCC 4.4 with -O2/3/s optimization may issue a warning:
     *   dereferencing type-punned pointer will break strict-aliasing rules
     *
     * Fortunately, GCC with -O1 compiles this nxt_memcpy()
     * in the same simple assignment as in the code above.
     */
    memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));

    res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
                                   &cmsg, sizeof(cmsg));

    return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
}


int
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
    int                   rc;
    nxt_unit_impl_t       *lib;
    nxt_unit_process_t    *process;
    nxt_unit_port_impl_t  *new_port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
                   port->id.pid, port->id.id,
                   port->in_fd, port->out_fd);

    pthread_mutex_lock(&lib->mutex);

    process = nxt_unit_process_get(ctx, port->id.pid);
    if (nxt_slow_path(process == NULL)) {
        rc = NXT_UNIT_ERROR;
        goto unlock;
    }

    if (port->id.id >= process->next_port_id) {
        process->next_port_id = port->id.id + 1;
    }

    new_port = malloc(sizeof(nxt_unit_port_impl_t));
    if (nxt_slow_path(new_port == NULL)) {
        rc = NXT_UNIT_ERROR;
        goto unlock;
    }

    new_port->port = *port;

    rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        goto unlock;
    }

    nxt_queue_insert_tail(&process->ports, &new_port->link);

    rc = NXT_UNIT_OK;

    new_port->process = process;

unlock:

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
        nxt_unit_process_use(ctx, process, -1);
    }

    return rc;
}


void
nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
{
    nxt_unit_find_remove_port(ctx, port_id, NULL);
}


void
nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    nxt_unit_port_t *r_port)
{
    nxt_unit_impl_t     *lib;
    nxt_unit_process_t  *process;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    process = NULL;

    nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(process != NULL)) {
        nxt_unit_process_use(ctx, process, -1);
    }
}


static void
nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    nxt_unit_port_t *r_port, nxt_unit_process_t **process)
{
    nxt_unit_impl_t       *lib;
    nxt_unit_port_impl_t  *port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
    if (nxt_slow_path(port == NULL)) {
        nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
                       (int) port_id->pid, (int) port_id->id);

        return;
    }

    nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
                   (int) port_id->pid, (int) port_id->id,
                   port->port.in_fd, port->port.out_fd, port->port.data);

    if (port->port.in_fd != -1) {
        close(port->port.in_fd);
    }

    if (port->port.out_fd != -1) {
        close(port->port.out_fd);
    }

    if (port->process != NULL) {
        nxt_queue_remove(&port->link);
    }

    if (process != NULL) {
        *process = port->process;
    }

    if (r_port != NULL) {
        *r_port = port->port;
    }

    free(port);
}


void
nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
{
    nxt_unit_impl_t     *lib;
    nxt_unit_process_t  *process;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    process = nxt_unit_process_find(ctx, pid, 1);
    if (nxt_slow_path(process == NULL)) {
        nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);

        pthread_mutex_unlock(&lib->mutex);

        return;
    }

    nxt_unit_remove_process(ctx, process);
}


static void
nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
{
    nxt_queue_t           ports;
    nxt_unit_impl_t       *lib;
    nxt_unit_port_impl_t  *port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_queue_init(&ports);

    nxt_queue_add(&ports, &process->ports);

    nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {

        nxt_unit_process_use(ctx, process, -1);
        port->process = NULL;

        /* Shortcut for default callback. */
        if (lib->callbacks.remove_port == nxt_unit_remove_port) {
            nxt_queue_remove(&port->link);

            nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
        }

    } nxt_queue_loop;

    pthread_mutex_unlock(&lib->mutex);

    nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {

        nxt_queue_remove(&port->link);

        lib->callbacks.remove_port(ctx, &port->port.id);

    } nxt_queue_loop;

    nxt_unit_process_use(ctx, process, -1);
}


void
nxt_unit_quit(nxt_unit_ctx_t *ctx)
{
    nxt_unit_impl_t  *lib;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    lib->online = 0;
}


static ssize_t
nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
    int                   fd;
    nxt_unit_impl_t       *lib;
    nxt_unit_port_impl_t  *port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);

    if (nxt_fast_path(port != NULL)) {
        fd = port->port.out_fd;

    } else {
        nxt_unit_warn(ctx, "port_send: port %d,%d not found",
                      (int) port_id->pid, (int) port_id->id);
        fd = -1;
    }

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(fd == -1)) {
        if (port != NULL) {
            nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1",
                          (int) port_id->pid, (int) port_id->id);
        }

        return -1;
    }

    nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
                   (int) port_id->pid, (int) port_id->id, fd);

    return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size);
}


ssize_t
nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
    const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
    ssize_t        res;
    struct iovec   iov[1];
    struct msghdr  msg;

    iov[0].iov_base = (void *) buf;
    iov[0].iov_len = buf_size;

    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
    msg.msg_flags = 0;
    msg.msg_control = (void *) oob;
    msg.msg_controllen = oob_size;

    res = sendmsg(fd, &msg, 0);

    if (nxt_slow_path(res == -1)) {
        nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)",
                      fd, (int) buf_size, strerror(errno), errno);

    } else {
        nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size,
                       (int) res);
    }

    return res;
}


static ssize_t
nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    void *buf, size_t buf_size, void *oob, size_t oob_size)
{
    int                   fd;
    nxt_unit_impl_t       *lib;
    nxt_unit_ctx_impl_t   *ctx_impl;
    nxt_unit_port_impl_t  *port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);

    if (nxt_fast_path(port != NULL)) {
        fd = port->port.in_fd;

    } else {
        nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
                       (int) port_id->pid, (int) port_id->id);
        fd = -1;
    }

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(fd == -1)) {
        return -1;
    }

    nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
                   (int) port_id->pid, (int) port_id->id, fd);

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
        ctx_impl->read_port_fd = fd;
    }

    return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
}


ssize_t
nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
    void *oob, size_t oob_size)
{
    ssize_t        res;
    struct iovec   iov[1];
    struct msghdr  msg;

    iov[0].iov_base = buf;
    iov[0].iov_len = buf_size;

    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
    msg.msg_flags = 0;
    msg.msg_control = oob;
    msg.msg_controllen = oob_size;

    res = recvmsg(fd, &msg, 0);

    if (nxt_slow_path(res == -1)) {
        nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)",
                      fd, strerror(errno), errno);

    } else {
        nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res);
    }

    return res;
}


static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
    nxt_unit_port_t          *port;
    nxt_unit_port_hash_id_t  *port_id;

    port = data;
    port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;

    if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
        && port_id->pid == port->id.pid
        && port_id->id == port->id.id)
    {
        return NXT_OK;
    }

    return NXT_DECLINED;
}


static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
    NXT_LVLHSH_DEFAULT,
    nxt_unit_port_hash_test,
    nxt_lvlhsh_alloc,
    nxt_lvlhsh_free,
};


static inline void
nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
    nxt_unit_port_hash_id_t *port_hash_id,
    nxt_unit_port_id_t *port_id)
{
    port_hash_id->pid = port_id->pid;
    port_hash_id->id = port_id->id;

    if (nxt_fast_path(port_id->hash != 0)) {
        lhq->key_hash = port_id->hash;

    } else {
        lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));

        port_id->hash = lhq->key_hash;

        nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
                       (int) port_id->pid, (int) port_id->id,
                       (int) port_id->hash);
    }

    lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
    lhq->key.start = (u_char *) port_hash_id;
    lhq->proto = &lvlhsh_ports_proto;
    lhq->pool = NULL;
}


static int
nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
{
    nxt_int_t                res;
    nxt_lvlhsh_query_t       lhq;
    nxt_unit_port_hash_id_t  port_hash_id;

    nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
    lhq.replace = 0;
    lhq.value = port;

    res = nxt_lvlhsh_insert(port_hash, &lhq);

    switch (res) {

    case NXT_OK:
        return NXT_UNIT_OK;

    default:
        return NXT_UNIT_ERROR;
    }
}


static nxt_unit_port_impl_t *
nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
    int remove)
{
    nxt_int_t                res;
    nxt_lvlhsh_query_t       lhq;
    nxt_unit_port_hash_id_t  port_hash_id;

    nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);

    if (remove) {
        res = nxt_lvlhsh_delete(port_hash, &lhq);

    } else {
        res = nxt_lvlhsh_find(port_hash, &lhq);
    }

    switch (res) {

    case NXT_OK:
        return lhq.value;

    default:
        return NULL;
    }
}


static nxt_int_t
nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
    return NXT_OK;
}


static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
    NXT_LVLHSH_DEFAULT,
    nxt_unit_request_hash_test,
    nxt_lvlhsh_alloc,
    nxt_lvlhsh_free,
};


static int
nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
    nxt_unit_request_info_impl_t *req_impl)
{
    uint32_t            *stream;
    nxt_int_t           res;
    nxt_lvlhsh_query_t  lhq;

    stream = &req_impl->stream;

    lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
    lhq.key.length = sizeof(*stream);
    lhq.key.start = (u_char *) stream;
    lhq.proto = &lvlhsh_requests_proto;
    lhq.pool = NULL;
    lhq.replace = 0;
    lhq.value = req_impl;

    res = nxt_lvlhsh_insert(request_hash, &lhq);

    switch (res) {

    case NXT_OK:
        return NXT_UNIT_OK;

    default:
        return NXT_UNIT_ERROR;
    }
}


static nxt_unit_request_info_impl_t *
nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
    int remove)
{
    nxt_int_t           res;
    nxt_lvlhsh_query_t  lhq;

    lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
    lhq.key.length = sizeof(stream);
    lhq.key.start = (u_char *) &stream;
    lhq.proto = &lvlhsh_requests_proto;
    lhq.pool = NULL;

    if (remove) {
        res = nxt_lvlhsh_delete(request_hash, &lhq);

    } else {
        res = nxt_lvlhsh_find(request_hash, &lhq);
    }

    switch (res) {

    case NXT_OK:
        return lhq.value;

    default:
        return NULL;
    }
}


void
nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
{
    int              log_fd, n;
    char             msg[NXT_MAX_ERROR_STR], *p, *end;
    pid_t            pid;
    va_list          ap;
    nxt_unit_impl_t  *lib;

    if (nxt_fast_path(ctx != NULL)) {
        lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

        pid = lib->pid;
        log_fd = lib->log_fd;

    } else {
        pid = getpid();
        log_fd = STDERR_FILENO;
    }

    p = msg;
    end = p + sizeof(msg) - 1;

    p = nxt_unit_snprint_prefix(p, end, pid, level);

    va_start(ap, fmt);
    p += vsnprintf(p, end - p, fmt, ap);
    va_end(ap);

    if (nxt_slow_path(p > end)) {
        memcpy(end - 5, "[...]", 5);
        p = end;
    }

    *p++ = '\n';

    n = write(log_fd, msg, p - msg);
    if (nxt_slow_path(n < 0)) {
        fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
    }
}


void
nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
{
    int                           log_fd, n;
    char                          msg[NXT_MAX_ERROR_STR], *p, *end;
    pid_t                         pid;
    va_list                       ap;
    nxt_unit_impl_t               *lib;
    nxt_unit_request_info_impl_t  *req_impl;

    if (nxt_fast_path(req != NULL)) {
        lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);

        pid = lib->pid;
        log_fd = lib->log_fd;

    } else {
        pid = getpid();
        log_fd = STDERR_FILENO;
    }

    p = msg;
    end = p + sizeof(msg) - 1;

    p = nxt_unit_snprint_prefix(p, end, pid, level);

    if (nxt_fast_path(req != NULL)) {
        req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

        p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
    }

    va_start(ap, fmt);
    p += vsnprintf(p, end - p, fmt, ap);
    va_end(ap);

    if (nxt_slow_path(p > end)) {
        memcpy(end - 5, "[...]", 5);
        p = end;
    }

    *p++ = '\n';

    n = write(log_fd, msg, p - msg);
    if (nxt_slow_path(n < 0)) {
        fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
    }
}


static const char * nxt_unit_log_levels[] = {
    "alert",
    "error",
    "warn",
    "notice",
    "info",
    "debug",
};


static char *
nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
{
    struct tm        tm;
    struct timespec  ts;

    (void) clock_gettime(CLOCK_REALTIME, &ts);

#if (NXT_HAVE_LOCALTIME_R)
    (void) localtime_r(&ts.tv_sec, &tm);
#else
    tm = *localtime(&ts.tv_sec);
#endif

    p += snprintf(p, end - p,
                  "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
                  tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
                  tm.tm_hour, tm.tm_min, tm.tm_sec,
                  (int) ts.tv_nsec / 1000000);

    p += snprintf(p, end - p,
                  "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
                  (int) pid,
                  (uint64_t) (uintptr_t) nxt_thread_get_tid());

    return p;
}


/* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */

void *
nxt_memalign(size_t alignment, size_t size)
{
    void        *p;
    nxt_err_t   err;

    err = posix_memalign(&p, alignment, size);

    if (nxt_fast_path(err == 0)) {
        return p;
    }

    return NULL;
}

#if (NXT_DEBUG)

void
nxt_free(void *p)
{
    free(p);
}

#endif